Skip to content

Commit

Permalink
Fixed error of stat collection for prepared statements
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Aug 30, 2024
1 parent 517be81 commit bb1202d
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 72 deletions.
12 changes: 8 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tech.ydb.jdbc.context.YdbContext;
import tech.ydb.jdbc.context.YdbValidator;
import tech.ydb.jdbc.query.ExplainedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;

Expand Down Expand Up @@ -36,27 +37,30 @@ public interface YdbConnection extends Connection {
/**
* Explicitly execute query as a data query
*
* @param yql query to execute
* @param query query to execute
* @param yql YQL text to execute
* @param params parameters for query
* @param timeout timeout of operation
* @param keepInCache flag to store query in server-side cache
* @param validator handler for logging and warnings
* @return list of result set
* @throws SQLException if query cannot be executed
*/
List<ResultSetReader> executeDataQuery(String yql, YdbValidator validator,
List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
int timeout, boolean keepInCache, Params params) throws SQLException;

/**
* Explicitly execute query as a scan query
*
* @param yql query to execute
* @param query query to execute
* @param yql YQL text to execute
* @param params parameters for query
* @param validator handler for logging and warnings
* @return single result set with rows
* @throws SQLException if query cannot be executed
*/
ResultSetReader executeScanQuery(String yql, YdbValidator validator, Params params) throws SQLException;
ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator validator, Params params)
throws SQLException;

/**
* Explicitly explain this query
Expand Down
8 changes: 6 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.Params;
Expand Down Expand Up @@ -50,15 +51,18 @@ public void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yq
}

@Override
public ResultSetReader executeScanQuery(YdbContext ctx, YdbValidator validator, String yql, Params params)
throws SQLException {
public ResultSetReader executeScanQuery(
YdbContext ctx, YdbValidator validator, YdbQuery query, String yql, Params params
) throws SQLException {
ensureOpened();

Collection<ResultSetReader> resultSets = new LinkedBlockingQueue<>();
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();

ctx.traceQuery(query, yql);
try (Session session = createNewTableSession(validator)) {
validator.execute(QueryType.SCAN_QUERY + " >>\n" + yql,
() -> session.executeScanQuery(yql, params, settings).start(resultSets::add));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.query.ExplainedQuery;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
Expand Down Expand Up @@ -196,7 +197,8 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException

@Override
public List<ResultSetReader> executeDataQuery(
YdbContext ctx, YdbValidator validator, String yql, long timeout, boolean keepInCache, Params params
YdbContext ctx, YdbValidator validator, YdbQuery query,
String yql, long timeout, boolean keepInCache, Params params
) throws SQLException {
ensureOpened();

Expand All @@ -206,6 +208,7 @@ public List<ResultSetReader> executeDataQuery(
}
final ExecuteQuerySettings settings = builder.build();

ctx.traceQuery(query, yql);
if (tx == null) {
tx = createNewQuerySession(validator).createNewTransaction(txMode);
}
Expand Down
24 changes: 16 additions & 8 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import tech.ydb.core.Status;
import tech.ydb.jdbc.common.FixedResultSetFactory;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.result.ResultSetReader;

/**
Expand All @@ -18,6 +17,7 @@ public class QueryStat {
private static final FixedResultSetFactory STATS_RS_FACTORY = FixedResultSetFactory.newBuilder()
.addTextColumn("sql")
.addBooleanColumn("is_fullscan")
.addBooleanColumn("is_error")
.addLongColumn("executed")
.addTextColumn("yql")
.addTextColumn("ast")
Expand All @@ -31,23 +31,26 @@ public class QueryStat {
private final String plan;
private final LongAdder usage;
private final boolean isFullScan;
private final boolean isError;

public QueryStat(YdbQuery query, String ast, String plan) {
this.originSQL = query.getOriginQuery();
this.preparedYQL = query.getPreparedYql();
public QueryStat(String sql, String yql, String ast, String plan) {
this.originSQL = sql;
this.preparedYQL = yql;
this.ast = ast;
this.plan = plan;
this.usage = new LongAdder();
this.isFullScan = plan.contains("\"Node Type\":\"TableFullScan\"");
this.isError = false;
}

public QueryStat(YdbQuery query, Status error) {
this.originSQL = query.getOriginQuery();
this.preparedYQL = query.getPreparedYql();
this.ast = error.toString();
public QueryStat(String sql, String yql, Status error) {
this.originSQL = sql;
this.preparedYQL = yql;
this.ast = null;
this.plan = error.toString();
this.usage = new LongAdder();
this.isFullScan = false;
this.isError = true;
}

public long getUsageCounter() {
Expand All @@ -74,6 +77,10 @@ public boolean isFullScan() {
return isFullScan;
}

public boolean isError() {
return isError;
}

public void incrementUsage() {
this.usage.increment();
}
Expand All @@ -84,6 +91,7 @@ public static ResultSetReader toResultSetReader(Collection<QueryStat> stats) {
builder.newRow()
.withTextValue("sql", stat.originSQL)
.withBoolValue("is_fullscan", stat.isFullScan)
.withBoolValue("is_error", stat.isError)
.withLongValue("executed", stat.usage.longValue())
.withTextValue("yql", stat.preparedYQL)
.withTextValue("ast", stat.ast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.query.ExplainedQuery;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand Down Expand Up @@ -168,7 +169,8 @@ public ExplainedQuery executeExplainQuery(YdbContext ctx, YdbValidator validator

@Override
public List<ResultSetReader> executeDataQuery(
YdbContext ctx, YdbValidator validator, String yql, long timeout, boolean keepInCache, Params params
YdbContext ctx, YdbValidator validator, YdbQuery query,
String yql, long timeout, boolean keepInCache, Params params
) throws SQLException {
ensureOpened();

Expand Down
62 changes: 32 additions & 30 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -191,18 +191,13 @@ public Collection<QueryStat> getQueryStats() {
if (queryStatesCache == null) {
return Collections.emptyList();
}
Set<QueryStat> sortedByUsage = new TreeSet<>(Comparator.comparingLong(QueryStat::getUsageCounter).reversed());
sortedByUsage.addAll(queryStatesCache.asMap().values());
return sortedByUsage;
}

public void traceQueryExecution(YdbQuery query) {
if (queryStatesCache != null) {
QueryStat stat = queryStatesCache.getIfPresent(query.getOriginQuery());
if (stat != null) {
stat.incrementUsage();
}
}
List<QueryStat> sorted = new ArrayList<>(queryStatesCache.asMap().values());
Collections.sort(sorted,
Comparator
.comparingLong(QueryStat::getUsageCounter).reversed()
.thenComparing(QueryStat::getPreparedYQL)
);
return sorted;
}

public void register() {
Expand Down Expand Up @@ -308,26 +303,33 @@ public YdbQuery findOrParseYdbQuery(String sql) throws SQLException {
queriesCache.put(sql, cached);
}

if (queryStatesCache != null) {
QueryStat stat = queryStatesCache.getIfPresent(sql);
if (stat == null) {
final String preparedYQL = cached.getPreparedYql();
final ExplainDataQuerySettings settings = withDefaultTimeout(new ExplainDataQuerySettings());
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
session -> session.explainDataQuery(preparedYQL, settings)
).join();

if (res.isSuccess()) {
ExplainDataQueryResult exp = res.getValue();
stat = new QueryStat(cached, exp.getQueryAst(), exp.getQueryPlan());
} else {
stat = new QueryStat(cached, res.getStatus());
}
queryStatesCache.put(sql, stat);
return cached;
}

public void traceQuery(YdbQuery query, String yql) {
if (queryStatesCache == null) {
return;
}

QueryStat stat = queryStatesCache.getIfPresent(yql);
if (stat == null) {
final ExplainDataQuerySettings settings = withDefaultTimeout(new ExplainDataQuerySettings());
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
session -> session.explainDataQuery(yql, settings)
).join();

if (res.isSuccess()) {
ExplainDataQueryResult exp = res.getValue();
stat = new QueryStat(query.getOriginQuery(), yql, exp.getQueryAst(), exp.getQueryPlan());
} else {
stat = new QueryStat(query.getOriginQuery(), yql, res.getStatus());
}

queryStatesCache.put(yql, stat);
}

return cached;
stat.incrementUsage();
}

public YdbPreparedQuery findOrPrepareParams(YdbQuery query, YdbPrepareMode mode) throws SQLException {
Expand Down
5 changes: 3 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.query.ExplainedQuery;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;

Expand Down Expand Up @@ -34,10 +35,10 @@ default void ensureOpened() throws SQLException {

void executeSchemeQuery(YdbContext ctx, YdbValidator validator, String yql) throws SQLException;

List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, String yql,
List<ResultSetReader> executeDataQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql,
long timeout, boolean poolable, Params params) throws SQLException;

ResultSetReader executeScanQuery(YdbContext ctx, YdbValidator validator, String yql, Params params)
ResultSetReader executeScanQuery(YdbContext ctx, YdbValidator validator, YdbQuery query, String yql, Params params)
throws SQLException;

ExplainedQuery executeExplainQuery(YdbContext ctx, YdbValidator validator, String yql)
Expand Down
8 changes: 5 additions & 3 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/BaseYdbStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,16 @@ protected List<YdbResult> executeExplainQuery(YdbQuery query) throws SQLExceptio
return Collections.singletonList(new YdbResult(new YdbResultSetImpl(this, result)));
}

protected List<YdbResult> executeScanQuery(String yql, Params params) throws SQLException {
ResultSetReader result = connection.executeScanQuery(yql, validator, params);
protected List<YdbResult> executeScanQuery(YdbQuery query, String yql, Params params) throws SQLException {
connection.getCtx().traceQuery(query, yql);
ResultSetReader result = connection.executeScanQuery(query, yql, validator, params);
return Collections.singletonList(new YdbResult(new YdbResultSetImpl(this, result)));
}

protected List<YdbResult> executeDataQuery(YdbQuery query, String yql, Params params) throws SQLException {
connection.getCtx().traceQuery(query, yql);
List<ResultSetReader> resultSets = connection
.executeDataQuery(yql, validator, getQueryTimeout(), isPoolable(), params);
.executeDataQuery(query, yql, validator, getQueryTimeout(), isPoolable(), params);

List<YdbResult> results = new ArrayList<>();
int idx = 0;
Expand Down
9 changes: 5 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,14 @@ public void executeSchemeQuery(String yql, YdbValidator validator) throws SQLExc
}

@Override
public List<ResultSetReader> executeDataQuery(String yql, YdbValidator validator,
public List<ResultSetReader> executeDataQuery(YdbQuery query, String yql, YdbValidator validator,
int timeout, boolean poolable, Params params) throws SQLException {
return executor.executeDataQuery(ctx, validator, yql, timeout, poolable, params);
return executor.executeDataQuery(ctx, validator, query, yql, timeout, poolable, params);
}

@Override
public ResultSetReader executeScanQuery(String yql, YdbValidator validator, Params params) throws SQLException {
public ResultSetReader executeScanQuery(YdbQuery query, String yql, YdbValidator validator, Params params)
throws SQLException {
executor.ensureOpened();

if (executor.isInsideTransaction()) {
Expand All @@ -227,7 +228,7 @@ public ResultSetReader executeScanQuery(String yql, YdbValidator validator, Para
}
}

return executor.executeScanQuery(ctx, validator, yql, params);
return executor.executeScanQuery(ctx, validator, query, yql, params);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public int[] executeBatch() throws SQLException {

try {
for (Params prm: prepared.getBatchParams()) {
getConnection().getCtx().traceQueryExecution(query);
executeDataQuery(query, prepared.getQueryText(prm), prm);
}
} finally {
Expand Down Expand Up @@ -126,13 +125,12 @@ public boolean execute() throws SQLException {
List<YdbResult> newState = null;

Params prms = prepared.getCurrentParams();
getConnection().getCtx().traceQueryExecution(query);
switch (query.getType()) {
case DATA_QUERY:
newState = executeDataQuery(query, prepared.getQueryText(prms), prms);
break;
case SCAN_QUERY:
newState = executeScanQuery(prepared.getQueryText(prms), prms);
newState = executeScanQuery(query, prepared.getQueryText(prms), prms);
break;
default:
throw new IllegalStateException("Internal error. Unsupported query type " + query.getType());
Expand All @@ -146,7 +144,7 @@ public boolean execute() throws SQLException {
public YdbResultSet executeScanQuery() throws SQLException {
cleanState();
Params prms = prepared.getCurrentParams();
List<YdbResult> state = executeScanQuery(prepared.getQueryText(prms), prms);
List<YdbResult> state = executeScanQuery(query, prepared.getQueryText(prms), prms);
prepared.clearParameters();
updateState(state);
return getResultSet();
Expand Down
Loading

0 comments on commit bb1202d

Please sign in to comment.