Skip to content

Commit

Permalink
Customized transaction tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 committed Oct 14, 2024
1 parent ba0e112 commit 3d21311
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 37 deletions.
5 changes: 5 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public boolean acceptsURL(String url) {
return YdbConfig.isYdb(url);
}

@Override
public String toString() {
return YdbDriverInfo.DRIVER_FULL_NAME;
}

@Override
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
YdbConfig config = YdbConfig.from(url, info);
Expand Down
27 changes: 13 additions & 14 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -16,6 +16,7 @@
public class YdbTracer {
private static final Logger LOGGER = Logger.getLogger(YdbTracer.class.getName());
private static final ThreadLocal<YdbTracer> LOCAL = new ThreadLocal<>();
private static final AtomicLong ANONYMOUS_COUNTER = new AtomicLong(0);

private final Date startDate = new Date();
private final long startedAt = System.currentTimeMillis();
Expand All @@ -27,12 +28,10 @@ public class YdbTracer {

private class Record {
private final long executedAt = System.currentTimeMillis();
private final String type;
private final String comment;
private final String message;

public Record(String type, String comment) {
this.type = type;
this.comment = comment;
Record(String message) {
this.message = message;
}
}

Expand All @@ -50,18 +49,18 @@ public static YdbTracer current() {
return tracer;
}

public void trace(String type, String comment) {
records.add(new Record(type, comment));
public void trace(String message) {
records.add(new Record(message));
}

public void setId(String id) {
this.txID = id;
trace("SET ID", id);
trace("set-id " + id);
}

public void markToPrint() {
this.isMarked = true;
trace("MARK TO PRINT", "");
trace("markToPrint");
}

public void close() {
Expand All @@ -76,14 +75,14 @@ public void close() {

long finishedAt = System.currentTimeMillis();

final String id = txID != null ? txID : "UKNOWN-" + ThreadLocalRandom.current().nextLong();
LOGGER.log(level, "Trace[{0}] started at {1}", new Object[] { id, startDate });
final String id = txID != null ? txID : "anonymous-" + ANONYMOUS_COUNTER.incrementAndGet();
LOGGER.log(level, "Trace[{0}] started at {1}", new Object[] {id, startDate});
long last = startedAt;
for (Record record: records) {
long ms = record.executedAt - last;
LOGGER.log(level, "Trace[{0}] {1} ms {2}: {3}", new Object[] { id, ms, record.type, record.comment });
LOGGER.log(level, "Trace[{0}] {1} ms {2}", new Object[] {id, ms, record.message});
last = record.executedAt;
}
LOGGER.log(level, "Trace[{0}] finished in {1} ms", new Object[] { id, finishedAt - startedAt });
LOGGER.log(level, "Trace[{0}] finished in {1} ms", new Object[] {id, finishedAt - startedAt});
}
}
43 changes: 38 additions & 5 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryType;
Expand All @@ -33,9 +34,11 @@ public abstract class BaseYdbExecutor implements YdbExecutor {
private final Duration sessionTimeout;
private final TableClient tableClient;
private final AtomicReference<YdbQueryResult> currResult;
protected final boolean traceEnabled;

public BaseYdbExecutor(YdbContext ctx) {
this.retryCtx = ctx.getRetryCtx();
this.traceEnabled = ctx.isTxTracerEnabled();
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.tableClient = ctx.getTableClient();
this.currResult = new AtomicReference<>();
Expand Down Expand Up @@ -74,6 +77,16 @@ public void ensureOpened() throws SQLException {
}
}

@Override
public YdbTracer trace(String message) {
if (!traceEnabled) {
return null;
}
YdbTracer tracer = YdbTracer.current();
tracer.trace(message);
return tracer;
}

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
ensureOpened();
Expand All @@ -83,11 +96,16 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
YdbValidator validator = statement.getValidator();

// Scheme query does not affect transactions or result sets
YdbTracer tracer = trace("--> scheme >>\n" + yql);
ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql,
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, tracer,
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
);

if (tracer != null && !isInsideTransaction()) {
tracer.close();
}

return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
}

Expand All @@ -98,10 +116,15 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,

String yql = query.getPreparedYql();
YdbValidator validator = statement.getValidator();
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
YdbTracer tracer = trace("--> bulk upsert >>\n" + yql);
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql, tracer,
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
);

if (tracer != null && !isInsideTransaction()) {
tracer.close();
}

return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
}

Expand All @@ -116,11 +139,11 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();

String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
final YdbTracer tracer = trace("--> scan query >>\n" + yql);
final Session session = createNewTableSession(validator);

String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
StreamQueryResult lazy = validator.call(msg, null, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
Expand All @@ -134,11 +157,21 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
if (th != null) {
result.onStreamFinished(th);
future.completeExceptionally(th);

if (tracer != null) {
tracer.trace("<-- " + th.getMessage());
tracer.close();
}
}
if (st != null) {
validator.addStatusIssues(st);
result.onStreamFinished(st);
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));

if (tracer != null) {
tracer.trace("<-- " + st.toString());
tracer.close();
}
}
});

Expand Down
58 changes: 52 additions & 6 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.impl.YdbStaticResultSet;
Expand Down Expand Up @@ -187,14 +188,18 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
return;
}

YdbTracer tracer = trace("--> commit");
CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
try {
validator.clearWarnings();
validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings));
validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings));
} finally {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
if (tracer != null) {
tracer.close();
}
}
}

Expand All @@ -207,16 +212,20 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
return;
}

YdbTracer tracer = trace("--> rollback");
RollbackTransactionSettings settings = ctx.withRequestTimeout(RollbackTransactionSettings.newBuilder())
.build();

try {
validator.clearWarnings();
validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings));
validator.execute("Rollback TxId: " + localTx.getId(), tracer, () -> localTx.rollback(settings));
} finally {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
if (tracer != null) {
tracer.close();
}
}
}

Expand Down Expand Up @@ -246,8 +255,9 @@ public YdbQueryResult executeDataQuery(
final QueryTransaction localTx = nextTx;

if (useStreamResultSet) {
YdbTracer tracer = trace("--> stream query >>\n" + yql);
String msg = "STREAM_QUERY >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
StreamQueryResult lazy = validator.call(msg, null, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
Expand All @@ -274,11 +284,27 @@ public void onNextPart(QueryResultPart part) {
if (th != null) {
future.completeExceptionally(th);
result.onStreamFinished(th);
if (tracer != null) {
tracer.trace("<-- " + th.getMessage());
if (localTx.isActive()) {
tracer.setId(localTx.getId());
} else {
tracer.close();
}
}
}
if (res != null) {
validator.addStatusIssues(res.getStatus());
future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus()));
result.onStreamFinished(res.getStatus());
if (tracer != null) {
tracer.trace("<-- " + res.getStatus().toString());
if (localTx.isActive()) {
tracer.setId(localTx.getId());
} else {
tracer.close();
}
}
}
});

Expand All @@ -288,8 +314,9 @@ public void onNextPart(QueryResultPart part) {
return updateCurrentResult(lazy);
}

YdbTracer tracer = trace("--> data query >>\n" + yql);
try {
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql, tracer,
() -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings))
);
validator.addStatusIssues(result.getIssueList());
Expand All @@ -305,6 +332,14 @@ public void onNextPart(QueryResultPart part) {
localTx.getSession().close();
}
}

if (tracer != null) {
if (localTx.isActive()) {
tracer.setId(localTx.getId());
} else {
tracer.close();
}
}
}
}

Expand All @@ -317,14 +352,20 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
YdbValidator validator = statement.getValidator();

// Scheme query does not affect transactions or result sets
YdbTracer tracer = trace("--> scheme query >>\n" + yql);
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build();
try (QuerySession session = createNewQuerySession(validator)) {
validator.call(QueryType.SCHEME_QUERY + " >>\n" + yql, () -> session
validator.call(QueryType.SCHEME_QUERY + " >>\n" + yql, tracer, () -> session
.createQuery(yql, TxMode.NONE, Params.empty(), settings)
.execute(new IssueHandler(validator))
);
} finally {
if (tracer != null && tx.get() == null) {
tracer.close();
}
}


return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
}

Expand All @@ -340,9 +381,10 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder())
.withExecMode(QueryExecMode.EXPLAIN)
.build();
YdbTracer tracer = trace("--> explain query >>\n" + yql);

try (QuerySession session = createNewQuerySession(validator)) {
QueryInfo res = validator.call(QueryType.EXPLAIN_QUERY + " >>\n" + yql, () -> session
QueryInfo res = validator.call(QueryType.EXPLAIN_QUERY + " >>\n" + yql, tracer, () -> session
.createQuery(yql, TxMode.NONE, Params.empty(), settings)
.execute(new IssueHandler(validator))
);
Expand All @@ -354,6 +396,10 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
return updateCurrentResult(
new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan())
);
} finally {
if (tracer != null && tx.get() == null) {
tracer.close();
}
}
}

Expand Down
Loading

0 comments on commit 3d21311

Please sign in to comment.