diff --git a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java index e9224b1..a619b79 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java @@ -43,6 +43,4 @@ public interface YdbStatement extends Statement { @Override YdbConnection getConnection() throws SQLException; - - void waitReady() throws SQLException; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 04e1f5b..1a93cb6 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -9,8 +9,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Issue; @@ -43,8 +41,6 @@ * @author Aleksandr Gorshenin */ public class QueryServiceExecutor extends BaseYdbExecutor { - private static final Logger LOGGER = Logger.getLogger(QueryServiceExecutor.class.getName()); - private final Duration sessionTimeout; private final QueryClient queryClient; @@ -73,7 +69,6 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE Result result = queryClient.createSession(sessionTimeout).join(); validator.addStatusIssues(result.getStatus()); QuerySession session = result.getValue(); - LOGGER.log(Level.FINEST, "Acquired session {0}", session); return session; } catch (UnexpectedResultException ex) { throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex); @@ -89,7 +84,6 @@ public void close() throws SQLException { private void cleanTx() { if (tx != null) { - LOGGER.log(Level.FINEST, "Released session {0}", tx.getSession()); tx.getSession().close(); tx = null; } @@ -205,7 +199,7 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } RollbackTransactionSettings settings = ctx.withRequestTimeout(RollbackTransactionSettings.newBuilder()) - .build(); + .build(); try { validator.clearWarnings(); @@ -371,5 +365,4 @@ private static TxMode txMode(int level, boolean isReadOnly) throws SQLException throw new SQLException(YdbConst.UNSUPPORTED_TRANSACTION_LEVEL + level); } } - } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java index 2a2f5af..49da5dd 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java @@ -11,7 +11,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; import tech.ydb.core.Issue; import tech.ydb.core.Result; @@ -37,6 +40,8 @@ * @author Aleksandr Gorshenin */ public class StreamQueryResult implements YdbQueryResult { + private static final Logger LOGGER = Logger.getLogger(StreamQueryResult.class.getName()); + private static final int DDL_EXPRESSION = -1; private static final int UPDATE_EXPRESSION = -2; @@ -44,13 +49,15 @@ public class StreamQueryResult implements YdbQueryResult { private final YdbStatement statement; private final Runnable stopRunnable; - private final CompletableFuture finishFuture = new CompletableFuture<>(); + private final CompletableFuture streamFuture = new CompletableFuture<>(); private final CompletableFuture> startFuture = new CompletableFuture<>(); private final int[] resultIndexes; private final List>> resultFutures = new ArrayList<>(); + private final AtomicBoolean streamCancelled = new AtomicBoolean(false); private int resultIndex = 0; + private volatile boolean resultClosed = false; public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable stopRunnable) { this.msg = msg; @@ -78,6 +85,7 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run } public CompletableFuture> execute(QueryStream stream, Runnable finish) { + LOGGER.log(Level.FINE, "Stream executed by QueryStream"); stream.execute(new QueryPartsHandler()) .thenApply(Result::getStatus) .whenComplete(this::onStreamFinished) @@ -88,7 +96,8 @@ public CompletableFuture> execute(QueryStream stream, public CompletableFuture> execute( GrpcReadStream stream, Runnable finish ) { - stream.start(rsr -> onResultSet(0, rsr)) + LOGGER.log(Level.FINE, "Stream executed by ScanQuery"); + stream.start(new ScanQueryHandler()) .whenComplete(this::onStreamFinished) .thenRun(finish); return startFuture; @@ -96,7 +105,7 @@ public CompletableFuture> execute( private void onStreamFinished(Status status, Throwable th) { if (th != null) { - finishFuture.completeExceptionally(th); + streamFuture.completeExceptionally(th); for (CompletableFuture> future: resultFutures) { future.completeExceptionally(th); } @@ -104,7 +113,7 @@ private void onStreamFinished(Status status, Throwable th) { } if (status != null) { - finishFuture.complete(status); + streamFuture.complete(status); if (status.isSuccess()) { for (CompletableFuture> future: resultFutures) { future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status)); @@ -128,9 +137,49 @@ private void onStreamFinished(Status status, Throwable th) { } } + private void closeResultSet(int index) throws SQLException { + try { + CompletableFuture> future = resultFutures.get(index); + if (future != null) { + future.join().getValue().close(); + } + } catch (UnexpectedResultException ex) { + throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex); + } + } + + private boolean isStreamStopped() { + if (!resultClosed) { + return false; + } + + if (!streamFuture.isDone() && streamCancelled.compareAndSet(false, true)) { + LOGGER.log(Level.FINE, "Stream cancel"); + stopRunnable.run(); + } + + return true; + } + @Override public void close() throws SQLException { - Status status = finishFuture.join(); + if (startFuture.isDone() && resultClosed) { + return; + } + + LOGGER.log(Level.FINE, "Stream closing"); + + resultClosed = true; + Status status = streamFuture.join(); + + statement.getValidator().addStatusIssues(status); + + if (streamCancelled.get()) { + LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status); + return; + } + + LOGGER.log(Level.FINE, "Stream closed with status {0}", status); if (!status.isSuccess()) { throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status, new UnexpectedResultException("Unexpected status", status) @@ -178,17 +227,6 @@ public boolean hasResultSets() throws SQLException { return resultIndexes[resultIndex] >= 0; } - private void closeResultSet(int index) throws SQLException { - try { - CompletableFuture> future = resultFutures.get(index); - if (future != null) { - future.join().getValue().close(); - } - } catch (UnexpectedResultException ex) { - throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex); - } - } - @Override public boolean getMoreResults(int current) throws SQLException { if (resultFutures == null || resultIndex >= resultFutures.size()) { @@ -225,25 +263,28 @@ private void onResultSet(int index, ResultSetReader rsr) { Result res = future.join(); if (res.isSuccess()) { - try { - res.getValue().addResultSet(rsr); - } catch (InterruptedException ex) { - stopRunnable.run(); - } + res.getValue().addResultSet(rsr); + } + } + + private class ScanQueryHandler implements GrpcReadStream.Observer { + @Override + public void onNext(ResultSetReader part) { + onResultSet(0, part); + startFuture.complete(Result.success(StreamQueryResult.this)); } } private class QueryPartsHandler implements QueryStream.PartsHandler { @Override public void onIssues(Issue[] issues) { - startFuture.complete(Result.success(StreamQueryResult.this)); statement.getValidator().addStatusIssues(Arrays.asList(issues)); } @Override public void onNextPart(QueryResultPart part) { - startFuture.complete(Result.success(StreamQueryResult.this)); onResultSet((int) part.getResultSetIndex(), part.getResultSetReader()); + startFuture.complete(Result.success(StreamQueryResult.this)); } } @@ -267,13 +308,25 @@ public void cleanQueue() { } } - public void addResultSet(ResultSetReader rsr) throws InterruptedException { - if (isClosed) { + public void addResultSet(ResultSetReader rsr) { + try { + do { + if (isStreamStopped()) { + close(); + return; + } + } while (!readers.offer(rsr, 100, TimeUnit.MILLISECONDS)); + } catch (InterruptedException ex) { + if (streamFuture.completeExceptionally(ex)) { + LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted"); + stopRunnable.run(); + } return; } - if (readers.offer(rsr, 60, TimeUnit.SECONDS)) { - rowsCount.addAndGet(rsr.getRowCount()); - } + + long total = rowsCount.addAndGet(rsr.getRowCount()); + LOGGER.log(Level.FINEST, "LazyResultSet got {0} rows", total); + if (isClosed) { cleanQueue(); } @@ -322,10 +375,6 @@ public void complete() { @Override public void close() { - if (isClosed) { - return; - } - isClosed = true; current = null; cleanQueue(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java index 35da0dc..5f42cd7 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java @@ -55,6 +55,10 @@ public BaseYdbStatement(Logger logger, YdbConnection connection, int resultSetTy this.bulkQueryTxMode = props.getBulkQueryTxMode(); } + private void ensureOpened() throws SQLException { + connection.getExecutor().ensureOpened(); + } + @Override public YdbValidator getValidator() { return validator; @@ -84,7 +88,8 @@ public int getResultSetType() { } @Override - public SQLWarning getWarnings() { + public SQLWarning getWarnings() throws SQLException { + ensureOpened(); return validator.toSQLWarnings(); } @@ -100,7 +105,6 @@ public int getQueryTimeout() { @Override public void setQueryTimeout(int seconds) throws SQLException { - ensureOpened(); queryTimeout = seconds; } @@ -126,36 +130,20 @@ public void setMaxRows(int max) { @Override public YdbResultSet getResultSet() throws SQLException { - ensureOpened(); return state.getCurrentResultSet(); } @Override public boolean getMoreResults(int current) throws SQLException { - ensureOpened(); return state.getMoreResults(current); } @Override public int getUpdateCount() throws SQLException { - ensureOpened(); return state.getUpdateCount(); } - private void ensureOpened() throws SQLException { - if (isClosed) { - throw new SQLException(YdbConst.CLOSED_CONNECTION); - } - } - - @Override - public void waitReady() throws SQLException { - state.close(); - } - protected void cleanState() throws SQLException { - ensureOpened(); - state.close(); state = YdbQueryResult.EMPTY; @@ -169,7 +157,7 @@ protected boolean updateState(YdbQueryResult result) throws SQLException { protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, ListValue rows) throws SQLException { - connection.getExecutor().ensureOpened(); + ensureOpened(); if (connection.getExecutor().isInsideTransaction()) { switch (bulkQueryTxMode) { @@ -188,12 +176,12 @@ protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, Lis } protected YdbQueryResult executeExplainQuery(YdbQuery query) throws SQLException { - connection.getExecutor().ensureOpened(); + ensureOpened(); return connection.getExecutor().executeExplainQuery(this, query); } protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params params) throws SQLException { - connection.getExecutor().ensureOpened(); + ensureOpened(); YdbContext ctx = connection.getCtx(); if (ctx.queryStatsEnabled()) { @@ -212,7 +200,7 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par } protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException { - connection.getExecutor().ensureOpened(); + ensureOpened(); if (connection.getExecutor().isInsideTransaction()) { switch (schemeQueryTxMode) { @@ -231,7 +219,7 @@ protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException } protected YdbQueryResult executeScanQuery(YdbQuery query, String yql, Params params) throws SQLException { - connection.getExecutor().ensureOpened(); + ensureOpened(); if (connection.getExecutor().isInsideTransaction()) { switch (scanQueryTxMode) { @@ -297,7 +285,6 @@ public void cancel() { @Override public boolean getMoreResults() throws SQLException { - ensureOpened(); return getMoreResults(Statement.KEEP_CURRENT_RESULT); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java index 266b056..2487216 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java @@ -282,7 +282,6 @@ public String getSchema() { @Override public int getNetworkTimeout() throws SQLException { - executor.ensureOpened(); return (int) ctx.getOperationProperties().getDeadlineTimeout().toMillis(); } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java index c08ec75..66585b0 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java @@ -10,6 +10,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -18,6 +20,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -861,6 +864,88 @@ public void testLiteralQuery() throws SQLException { } } + private String createPayload(Random rnd, int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char)('0' + rnd.nextInt(75))); + } + return sb.toString(); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD) + public void testBigBulkAndScan() throws SQLException { + String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?"); + String scanSelectAll = QUERIES.scanSelectSQL(); + String selectOne = QUERIES.selectAllByKey("?"); + + Random rnd = new Random(0x234567); + int payloadLength = 1000; + + try { + // BULK UPSERT + try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) { + for (int idx = 1; idx <= 10000; idx++) { + ps.setInt(1, idx); + String payload = createPayload(rnd, payloadLength); + ps.setString(2, payload); + ps.addBatch(); + if (idx % 1000 == 0) { + ps.executeBatch(); + } + } + ps.executeBatch(); + } + + // SCAN all table + try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + int readed = 0; + Assertions.assertTrue(ps.execute()); + try (ResultSet rs = ps.getResultSet()) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("key")); + Assertions.assertEquals(payloadLength, rs.getString("c_Text").length()); + } + } + Assertions.assertEquals(10000, readed); + } + + // Canceled scan + try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + Assertions.assertTrue(ps.execute()); + ps.getResultSet().next(); + ps.getResultSet().close(); + + SQLWarning w = ps.getWarnings(); + Assertions.assertNotNull(w); + Assertions.assertEquals("gRPC error: (CANCELLED) Cancelled on user request (S_ERROR)", w.getMessage()); + + w = w.getNextWarning(); + Assertions.assertNotNull(w); + Assertions.assertEquals("java.util.concurrent.CancellationException (S_ERROR)", w.getMessage()); + + w = w.getNextWarning(); + Assertions.assertNull(w); + } + + // Scan was cancelled, but connection still work + try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) { + ps.setInt(1, 1234); + + Assertions.assertTrue(ps.execute()); + try (ResultSet rs = ps.getResultSet()) { + Assertions.assertTrue(rs.next()); + Assertions.assertEquals(1234, rs.getInt("key")); + Assertions.assertEquals(payloadLength, rs.getString("c_Text").length()); + Assertions.assertFalse(rs.next()); + } + } + } finally { + cleanTable(); + } + } + @Test public void testAnsiLexer() throws SQLException { try (Statement statement = jdbc.connection().createStatement()) { diff --git a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java index 82431de..24a1a6c 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/impl/YdbQueryConnectionImplTest.java @@ -10,6 +10,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -19,6 +21,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -876,6 +879,88 @@ public void testLiteralQuery() throws SQLException { } } + private String createPayload(Random rnd, int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char)('0' + rnd.nextInt(75))); + } + return sb.toString(); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS, threadMode = Timeout.ThreadMode.SAME_THREAD) + public void testBigBulkAndScan() throws SQLException { + String bulkUpsert = QUERIES.upsertOne(SqlQueries.JdbcQuery.BULK, "c_Text", "Text?"); + String scanSelectAll = QUERIES.scanSelectSQL(); + String selectOne = QUERIES.selectAllByKey("?"); + + Random rnd = new Random(0x234567); + int payloadLength = 1000; + + try { + // BULK UPSERT + try (PreparedStatement ps = jdbc.connection().prepareStatement(bulkUpsert)) { + for (int idx = 1; idx <= 10000; idx++) { + ps.setInt(1, idx); + String payload = createPayload(rnd, payloadLength); + ps.setString(2, payload); + ps.addBatch(); + if (idx % 1000 == 0) { + ps.executeBatch(); + } + } + ps.executeBatch(); + } + + // SCAN all table + try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + int readed = 0; + Assertions.assertTrue(ps.execute()); + try (ResultSet rs = ps.getResultSet()) { + while (rs.next()) { + readed++; + Assertions.assertEquals(readed, rs.getInt("key")); + Assertions.assertEquals(payloadLength, rs.getString("c_Text").length()); + } + } + Assertions.assertEquals(10000, readed); + } + + // Canceled scan + try (PreparedStatement ps = jdbc.connection().prepareStatement(scanSelectAll)) { + Assertions.assertTrue(ps.execute()); + ps.getResultSet().next(); + ps.getResultSet().close(); + + SQLWarning w = ps.getWarnings(); + Assertions.assertNotNull(w); + Assertions.assertEquals("gRPC error: (CANCELLED) Cancelled on user request (S_ERROR)", w.getMessage()); + + w = w.getNextWarning(); + Assertions.assertNotNull(w); + Assertions.assertEquals("java.util.concurrent.CancellationException (S_ERROR)", w.getMessage()); + + w = w.getNextWarning(); + Assertions.assertNull(w); + } + + // Scan was cancelled, but connection still work + try (PreparedStatement ps = jdbc.connection().prepareStatement(selectOne)) { + ps.setInt(1, 1234); + + Assertions.assertTrue(ps.execute()); + try (ResultSet rs = ps.getResultSet()) { + Assertions.assertTrue(rs.next()); + Assertions.assertEquals(1234, rs.getInt("key")); + Assertions.assertEquals(payloadLength, rs.getString("c_Text").length()); + Assertions.assertFalse(rs.next()); + } + } + } finally { + cleanTable(); + } + } + @Test public void testAnsiLexer() throws SQLException { try (Statement statement = jdbc.connection().createStatement()) { diff --git a/jdbc/src/test/resources/sql/select.sql b/jdbc/src/test/resources/sql/select.sql index e3469b0..58c4f01 100644 --- a/jdbc/src/test/resources/sql/select.sql +++ b/jdbc/src/test/resources/sql/select.sql @@ -27,4 +27,4 @@ select c_Interval, c_Decimal -from #tableName +from #tableName order by key diff --git a/pom.xml b/pom.xml index 79bc4c5..2f8ab3b 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 1.7.36 5.9.3 - 2.3.0 + 2.3.1