Skip to content

Commit

Permalink
Added impelemntation and tests of query analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Aug 12, 2024
1 parent 4e581be commit efbaf8a
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 20 deletions.
95 changes: 95 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryStat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package tech.ydb.jdbc.context;

import java.util.Collection;
import java.util.concurrent.atomic.LongAdder;

import tech.ydb.core.Status;
import tech.ydb.jdbc.common.FixedResultSetFactory;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.result.ResultSetReader;

/**
*
* @author Aleksandr Gorshenin
*/
public class QueryStat {
public static final String QUERY = "print_jdbc_stats();";

private static final FixedResultSetFactory STATS_RS_FACTORY = FixedResultSetFactory.newBuilder()
.addTextColumn("sql")
.addBooleanColumn("is_fullscan")
.addLongColumn("executed")
.addTextColumn("yql")
.addTextColumn("ast")
.addTextColumn("plan")
.build();

private final String originSQL;
private final String preparedYQL;

private final String ast;
private final String plan;
private final LongAdder usage;
private final boolean isFullScan;

public QueryStat(YdbQuery query, String ast, String plan) {
this.originSQL = query.getOriginQuery();
this.preparedYQL = query.getPreparedYql();
this.ast = ast;
this.plan = plan;
this.usage = new LongAdder();
this.isFullScan = plan.contains("\"Node Type\":\"TableFullScan\"");
}

public QueryStat(YdbQuery query, Status error) {
this.originSQL = query.getOriginQuery();
this.preparedYQL = query.getPreparedYql();
this.ast = error.toString();
this.plan = error.toString();
this.usage = new LongAdder();
this.isFullScan = false;
}

public long getUsageCounter() {
return usage.longValue();
}

public String getOriginSQL() {
return originSQL;
}

public String getPreparedYQL() {
return preparedYQL;
}

public String getAat() {
return ast;
}

public String getPlan() {
return plan;
}

public boolean isFullScan() {
return isFullScan;
}

public void incrementUsage() {
this.usage.increment();
}

public static ResultSetReader toResultSetReader(Collection<QueryStat> stats) {
FixedResultSetFactory.ResultSetBuilder builder = STATS_RS_FACTORY.createResultSet();
for (QueryStat stat: stats) {
builder.newRow()
.withTextValue("sql", stat.originSQL)
.withBoolValue("is_fullscan", stat.isFullScan)
.withLongValue("executed", stat.usage.longValue())
.withTextValue("yql", stat.preparedYQL)
.withTextValue("ast", stat.ast)
.withTextValue("plan", stat.plan)
.build();
}
return builder.build();
}
}
61 changes: 43 additions & 18 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
Expand Down Expand Up @@ -178,6 +183,28 @@ public boolean hasConnections() {
return connectionsCount.get() > 0;
}

public boolean queryStatsEnabled() {
return queryStatesCache != null;
}

public Collection<QueryStat> getQueryStats() {
if (queryStatesCache == null) {
return Collections.emptyList();
}
Set<QueryStat> sortedByUsage = new TreeSet<>(Comparator.comparingLong(QueryStat::getUsageCounter).reversed());
sortedByUsage.addAll(queryStatesCache.asMap().values());
return sortedByUsage;
}

public void traceQueryExecution(YdbQuery query) {
if (queryStatesCache != null) {
QueryStat stat = queryStatesCache.getIfPresent(query.getOriginQuery());
if (stat != null) {
stat.incrementUsage();
}
}
}

public void register() {
int actual = connectionsCount.incrementAndGet();
int maxSize = tableClient.sessionPoolStats().getMaxSize();
Expand Down Expand Up @@ -279,26 +306,24 @@ public YdbQuery findOrParseYdbQuery(String sql) throws SQLException {
if (cached == null) {
cached = parseYdbQuery(sql);
queriesCache.put(sql, cached);
}

if (queryStatesCache != null) {
QueryStat stat = queryStatesCache.getIfPresent(sql);
if (stat == null) {
final String preparedYQL = cached.getPreparedYql();
final ExplainDataQuerySettings settings = withDefaultTimeout(new ExplainDataQuerySettings());
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
session -> session.explainDataQuery(preparedYQL, settings)
).join();

if (res.isSuccess()) {
ExplainDataQueryResult exp = res.getValue();
stat = new QueryStat(cached, exp.getQueryAst(), exp.getQueryPlan());
} else {
stat = new QueryStat(cached, res.getStatus());
}
queryStatesCache.put(sql, stat);
}
if (queryStatesCache != null) {
QueryStat stat = queryStatesCache.getIfPresent(sql);
if (stat == null) {
final String preparedYQL = cached.getPreparedYql();
final ExplainDataQuerySettings settings = withDefaultTimeout(new ExplainDataQuerySettings());
Result<ExplainDataQueryResult> res = retryCtx.supplyResult(
session -> session.explainDataQuery(preparedYQL, settings)
).join();

stat.incrementUsage();
if (res.isSuccess()) {
ExplainDataQueryResult exp = res.getValue();
stat = new QueryStat(cached, exp.getQueryAst(), exp.getQueryPlan());
} else {
stat = new QueryStat(cached, res.getStatus());
}
queryStatesCache.put(sql, stat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public int[] executeBatch() throws SQLException {

try {
for (Params prm: prepared.getBatchParams()) {
getConnection().getCtx().traceQueryExecution(query);
executeDataQuery(query, prepared.getQueryText(prm), prm);
}
} finally {
Expand Down Expand Up @@ -123,7 +124,9 @@ public boolean execute() throws SQLException {
clearBatch();

List<YdbResult> newState = null;

Params prms = prepared.getCurrentParams();
getConnection().getCtx().traceQueryExecution(query);
switch (query.getType()) {
case DATA_QUERY:
newState = executeDataQuery(query, prepared.getQueryText(prms), prms);
Expand Down
18 changes: 17 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/impl/YdbStatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import tech.ydb.jdbc.YdbConnection;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.context.QueryStat;
import tech.ydb.jdbc.context.YdbContext;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.query.Params;

Expand Down Expand Up @@ -79,7 +82,20 @@ public int executeUpdate(String sql) throws SQLException {
public boolean execute(String sql) throws SQLException {
cleanState();

YdbQuery query = getConnection().getCtx().parseYdbQuery(sql);
YdbContext ctx = getConnection().getCtx();
YdbQuery query;

if (ctx.queryStatsEnabled()) {
if (sql != null && QueryStat.QUERY.equalsIgnoreCase(sql.trim())) {
YdbResultSet rs = new YdbResultSetImpl(this, QueryStat.toResultSetReader(ctx.getQueryStats()));
return updateState(Collections.singletonList(new YdbResult(rs)));
}
query = ctx.findOrParseYdbQuery(sql);
ctx.traceQueryExecution(query);
} else {
query = ctx.parseYdbQuery(sql);
}

List<YdbResult> newState = null;
switch (query.getType()) {
case SCHEME_QUERY:
Expand Down
2 changes: 1 addition & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class YdbConfig {
"Use QueryService intead of TableService", false
);
static final YdbProperty<Boolean> FULLSCAN_DETECTOR_ENABLED = YdbProperty.bool(
"jdbc.ydb.fullscan_analyze", "Enable analizator for collecting query stats", false
"jdbcFullScanDetector", "Enable analizator for collecting query stats", false
);


Expand Down
83 changes: 83 additions & 0 deletions jdbc/src/test/java/tech/ydb/jdbc/impl/YdbConnectionImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -925,4 +925,87 @@ public void testUnsupportedComplexTypes(String type) throws SQLException {
() -> statement.execute(sql));
}
}

@Test
public void testFullScanAnalyzer() throws SQLException {
try (Connection connection = jdbc.createCustomConnection("jdbcFullScanDetector", "true")) {
String selectAll = QUERIES.selectAllSQL();
String selectByKey = QUERIES.selectAllByKey("1");
String preparedSelect = QUERIES.selectAllByKey("?");

try (Statement st = connection.createStatement()) {
try (ResultSet rs = st.executeQuery(" print_JDBC_stats(); ")) {
Assertions.assertFalse(rs.next()); // not stats
}

try (ResultSet rs = st.executeQuery(selectAll)) {
Assertions.assertFalse(rs.next());
}

try (ResultSet rs = st.executeQuery("Print_JDBC_stats();\n")) {
Assertions.assertTrue(rs.next());
Assertions.assertEquals(selectAll, rs.getString("sql"));
Assertions.assertEquals(true, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(1l, rs.getLong("executed"));

Assertions.assertFalse(rs.next());
}

try (ResultSet rs = st.executeQuery(selectAll)) {
Assertions.assertFalse(rs.next());
}
try (ResultSet rs = st.executeQuery(selectByKey)) {
Assertions.assertFalse(rs.next());
}

try (ResultSet rs = st.executeQuery("Print_JDBC_stats();\n")) {
Assertions.assertTrue(rs.next());
Assertions.assertEquals(selectAll, rs.getString("sql"));
Assertions.assertEquals(true, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(2l, rs.getLong("executed"));

Assertions.assertTrue(rs.next());
Assertions.assertEquals(selectByKey, rs.getString("sql"));
Assertions.assertEquals(false, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(1l, rs.getLong("executed"));

Assertions.assertFalse(rs.next());
}

try (PreparedStatement ps = connection.prepareStatement(preparedSelect)) {
ps.setLong(1, 1);
try (ResultSet rs = ps.executeQuery()) {
Assertions.assertFalse(rs.next());
}
ps.setLong(1, 2);
try (ResultSet rs = ps.executeQuery()) {
Assertions.assertFalse(rs.next());
}
ps.setLong(1, 3);
try (ResultSet rs = ps.executeQuery()) {
Assertions.assertFalse(rs.next());
}
}

try (ResultSet rs = st.executeQuery("Print_JDBC_stats();\n")) {
Assertions.assertTrue(rs.next());
Assertions.assertEquals(preparedSelect, rs.getString("sql"));
Assertions.assertEquals(false, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(3l, rs.getLong("executed"));

Assertions.assertTrue(rs.next());
Assertions.assertEquals(selectAll, rs.getString("sql"));
Assertions.assertEquals(true, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(2l, rs.getLong("executed"));

Assertions.assertTrue(rs.next());
Assertions.assertEquals(selectByKey, rs.getString("sql"));
Assertions.assertEquals(false, rs.getBoolean("is_fullscan"));
Assertions.assertEquals(1l, rs.getLong("executed"));

Assertions.assertFalse(rs.next());
}
}
}
}
}
Loading

0 comments on commit efbaf8a

Please sign in to comment.