Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed stream reading for scan queries #70

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,4 @@ public interface YdbStatement extends Statement {

@Override
YdbConnection getConnection() throws SQLException;

void waitReady() throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Issue;
Expand Down Expand Up @@ -43,8 +41,6 @@
* @author Aleksandr Gorshenin
*/
public class QueryServiceExecutor extends BaseYdbExecutor {
private static final Logger LOGGER = Logger.getLogger(QueryServiceExecutor.class.getName());

private final Duration sessionTimeout;
private final QueryClient queryClient;

Expand Down Expand Up @@ -73,7 +69,6 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
Result<QuerySession> result = queryClient.createSession(sessionTimeout).join();
validator.addStatusIssues(result.getStatus());
QuerySession session = result.getValue();
LOGGER.log(Level.FINEST, "Acquired session {0}", session);
return session;
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
Expand All @@ -89,7 +84,6 @@ public void close() throws SQLException {

private void cleanTx() {
if (tx != null) {
LOGGER.log(Level.FINEST, "Released session {0}", tx.getSession());
tx.getSession().close();
tx = null;
}
Expand Down Expand Up @@ -205,7 +199,7 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
}

RollbackTransactionSettings settings = ctx.withRequestTimeout(RollbackTransactionSettings.newBuilder())
.build();
.build();

try {
validator.clearWarnings();
Expand Down Expand Up @@ -371,5 +365,4 @@ private static TxMode txMode(int level, boolean isReadOnly) throws SQLException
throw new SQLException(YdbConst.UNSUPPORTED_TRANSACTION_LEVEL + level);
}
}

}
113 changes: 81 additions & 32 deletions jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
Expand All @@ -37,20 +40,24 @@
* @author Aleksandr Gorshenin
*/
public class StreamQueryResult implements YdbQueryResult {
private static final Logger LOGGER = Logger.getLogger(StreamQueryResult.class.getName());

private static final int DDL_EXPRESSION = -1;
private static final int UPDATE_EXPRESSION = -2;

private final String msg;
private final YdbStatement statement;
private final Runnable stopRunnable;

private final CompletableFuture<Status> finishFuture = new CompletableFuture<>();
private final CompletableFuture<Status> streamFuture = new CompletableFuture<>();
private final CompletableFuture<Result<StreamQueryResult>> startFuture = new CompletableFuture<>();

private final int[] resultIndexes;
private final List<CompletableFuture<Result<LazyResultSet>>> resultFutures = new ArrayList<>();
private final AtomicBoolean streamCancelled = new AtomicBoolean(false);

private int resultIndex = 0;
private volatile boolean resultClosed = false;

public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Runnable stopRunnable) {
this.msg = msg;
Expand Down Expand Up @@ -78,6 +85,7 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run
}

public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream, Runnable finish) {
LOGGER.log(Level.FINE, "Stream executed by QueryStream");
stream.execute(new QueryPartsHandler())
.thenApply(Result::getStatus)
.whenComplete(this::onStreamFinished)
Expand All @@ -88,23 +96,24 @@ public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream,
public CompletableFuture<Result<StreamQueryResult>> execute(
GrpcReadStream<ResultSetReader> stream, Runnable finish
) {
stream.start(rsr -> onResultSet(0, rsr))
LOGGER.log(Level.FINE, "Stream executed by ScanQuery");
stream.start(new ScanQueryHandler())
.whenComplete(this::onStreamFinished)
.thenRun(finish);
return startFuture;
}

private void onStreamFinished(Status status, Throwable th) {
if (th != null) {
finishFuture.completeExceptionally(th);
streamFuture.completeExceptionally(th);
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
future.completeExceptionally(th);
}
startFuture.completeExceptionally(th);
}

if (status != null) {
finishFuture.complete(status);
streamFuture.complete(status);
if (status.isSuccess()) {
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status));
Expand All @@ -128,9 +137,49 @@ private void onStreamFinished(Status status, Throwable th) {
}
}

private void closeResultSet(int index) throws SQLException {
try {
CompletableFuture<Result<LazyResultSet>> future = resultFutures.get(index);
if (future != null) {
future.join().getValue().close();
}
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex);
}
}

private boolean isStreamStopped() {
if (!resultClosed) {
return false;
}

if (!streamFuture.isDone() && streamCancelled.compareAndSet(false, true)) {
LOGGER.log(Level.FINE, "Stream cancel");
stopRunnable.run();
}

return true;
}

@Override
public void close() throws SQLException {
Status status = finishFuture.join();
if (startFuture.isDone() && resultClosed) {
return;
}

LOGGER.log(Level.FINE, "Stream closing");

resultClosed = true;
Status status = streamFuture.join();

statement.getValidator().addStatusIssues(status);

if (streamCancelled.get()) {
LOGGER.log(Level.FINE, "Stream canceled and finished with status {0}", status);
return;
}

LOGGER.log(Level.FINE, "Stream closed with status {0}", status);
if (!status.isSuccess()) {
throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status,
new UnexpectedResultException("Unexpected status", status)
Expand Down Expand Up @@ -178,17 +227,6 @@ public boolean hasResultSets() throws SQLException {
return resultIndexes[resultIndex] >= 0;
}

private void closeResultSet(int index) throws SQLException {
try {
CompletableFuture<Result<LazyResultSet>> future = resultFutures.get(index);
if (future != null) {
future.join().getValue().close();
}
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex);
}
}

@Override
public boolean getMoreResults(int current) throws SQLException {
if (resultFutures == null || resultIndex >= resultFutures.size()) {
Expand Down Expand Up @@ -225,25 +263,28 @@ private void onResultSet(int index, ResultSetReader rsr) {

Result<LazyResultSet> res = future.join();
if (res.isSuccess()) {
try {
res.getValue().addResultSet(rsr);
} catch (InterruptedException ex) {
stopRunnable.run();
}
res.getValue().addResultSet(rsr);
}
}

private class ScanQueryHandler implements GrpcReadStream.Observer<ResultSetReader> {
@Override
public void onNext(ResultSetReader part) {
onResultSet(0, part);
startFuture.complete(Result.success(StreamQueryResult.this));
}
}

private class QueryPartsHandler implements QueryStream.PartsHandler {
@Override
public void onIssues(Issue[] issues) {
startFuture.complete(Result.success(StreamQueryResult.this));
statement.getValidator().addStatusIssues(Arrays.asList(issues));
}

@Override
public void onNextPart(QueryResultPart part) {
startFuture.complete(Result.success(StreamQueryResult.this));
onResultSet((int) part.getResultSetIndex(), part.getResultSetReader());
startFuture.complete(Result.success(StreamQueryResult.this));
}
}

Expand All @@ -267,13 +308,25 @@ public void cleanQueue() {
}
}

public void addResultSet(ResultSetReader rsr) throws InterruptedException {
if (isClosed) {
public void addResultSet(ResultSetReader rsr) {
try {
do {
if (isStreamStopped()) {
close();
return;
}
} while (!readers.offer(rsr, 100, TimeUnit.MILLISECONDS));
} catch (InterruptedException ex) {
if (streamFuture.completeExceptionally(ex)) {
LOGGER.log(Level.WARNING, "LazyResultSet offer interrupted");
stopRunnable.run();
}
return;
}
if (readers.offer(rsr, 60, TimeUnit.SECONDS)) {
rowsCount.addAndGet(rsr.getRowCount());
}

long total = rowsCount.addAndGet(rsr.getRowCount());
LOGGER.log(Level.FINEST, "LazyResultSet got {0} rows", total);

if (isClosed) {
cleanQueue();
}
Expand Down Expand Up @@ -322,10 +375,6 @@ public void complete() {

@Override
public void close() {
if (isClosed) {
return;
}

isClosed = true;
current = null;
cleanQueue();
Expand Down
35 changes: 11 additions & 24 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public BaseYdbStatement(Logger logger, YdbConnection connection, int resultSetTy
this.bulkQueryTxMode = props.getBulkQueryTxMode();
}

private void ensureOpened() throws SQLException {
connection.getExecutor().ensureOpened();
}

@Override
public YdbValidator getValidator() {
return validator;
Expand Down Expand Up @@ -84,7 +88,8 @@ public int getResultSetType() {
}

@Override
public SQLWarning getWarnings() {
public SQLWarning getWarnings() throws SQLException {
ensureOpened();
return validator.toSQLWarnings();
}

Expand All @@ -100,7 +105,6 @@ public int getQueryTimeout() {

@Override
public void setQueryTimeout(int seconds) throws SQLException {
ensureOpened();
queryTimeout = seconds;
}

Expand All @@ -126,36 +130,20 @@ public void setMaxRows(int max) {

@Override
public YdbResultSet getResultSet() throws SQLException {
ensureOpened();
return state.getCurrentResultSet();
}

@Override
public boolean getMoreResults(int current) throws SQLException {
ensureOpened();
return state.getMoreResults(current);
}

@Override
public int getUpdateCount() throws SQLException {
ensureOpened();
return state.getUpdateCount();
}

private void ensureOpened() throws SQLException {
if (isClosed) {
throw new SQLException(YdbConst.CLOSED_CONNECTION);
}
}

@Override
public void waitReady() throws SQLException {
state.close();
}

protected void cleanState() throws SQLException {
ensureOpened();

state.close();
state = YdbQueryResult.EMPTY;

Expand All @@ -169,7 +157,7 @@ protected boolean updateState(YdbQueryResult result) throws SQLException {

protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, ListValue rows)
throws SQLException {
connection.getExecutor().ensureOpened();
ensureOpened();

if (connection.getExecutor().isInsideTransaction()) {
switch (bulkQueryTxMode) {
Expand All @@ -188,12 +176,12 @@ protected YdbQueryResult executeBulkUpsert(YdbQuery query, String tablePath, Lis
}

protected YdbQueryResult executeExplainQuery(YdbQuery query) throws SQLException {
connection.getExecutor().ensureOpened();
ensureOpened();
return connection.getExecutor().executeExplainQuery(this, query);
}

protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params params) throws SQLException {
connection.getExecutor().ensureOpened();
ensureOpened();

YdbContext ctx = connection.getCtx();
if (ctx.queryStatsEnabled()) {
Expand All @@ -212,7 +200,7 @@ protected YdbQueryResult executeDataQuery(YdbQuery query, String yql, Params par
}

protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException {
connection.getExecutor().ensureOpened();
ensureOpened();

if (connection.getExecutor().isInsideTransaction()) {
switch (schemeQueryTxMode) {
Expand All @@ -231,7 +219,7 @@ protected YdbQueryResult executeSchemeQuery(YdbQuery query) throws SQLException
}

protected YdbQueryResult executeScanQuery(YdbQuery query, String yql, Params params) throws SQLException {
connection.getExecutor().ensureOpened();
ensureOpened();

if (connection.getExecutor().isInsideTransaction()) {
switch (scanQueryTxMode) {
Expand Down Expand Up @@ -297,7 +285,6 @@ public void cancel() {

@Override
public boolean getMoreResults() throws SQLException {
ensureOpened();
return getMoreResults(Statement.KEEP_CURRENT_RESULT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public String getSchema() {

@Override
public int getNetworkTimeout() throws SQLException {
executor.ensureOpened();
return (int) ctx.getOperationProperties().getDeadlineTimeout().toMillis();
}

Expand Down
Loading