Skip to content

Commit

Permalink
Fixed usePrefixPath and table descriptions cache (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 authored Oct 17, 2024
2 parents 95def8f + 9ca7288 commit 7a124f7
Show file tree
Hide file tree
Showing 15 changed files with 324 additions and 237 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

* QueryService is enabled by default

## 2.2.13 ##

* Fixed usePrefixPath for preparing of YQL statements

## 2.2.12 ##

* Added option usePrefixPath
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,22 @@ Specify the YDB JDBC driver in the dependencies:
<dependency>
<groupId>tech.ydb.jdbc</groupId>
<artifactId>ydb-jdbc-driver</artifactId>
<<<<<<< HEAD
<version>2.3.0</version>
=======
<version>2.2.13</version>
>>>>>>> bugfix
</dependency>

<!-- Shaded version with included dependencies -->
<dependency>
<groupId>tech.ydb.jdbc</groupId>
<artifactId>ydb-jdbc-driver-shaded</artifactId>
<<<<<<< HEAD
<version>2.3.0</version>
=======
<version>2.2.13</version>
>>>>>>> bugfix
</dependency>
</dependencies>
```
Expand Down
1 change: 1 addition & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public final class YdbConst {
public static final String CANNOT_LOAD_DATA_FROM_IS = "Unable to load data from input stream: ";
public static final String CANNOT_LOAD_DATA_FROM_READER = "Unable to load data from reader: ";
public static final String STATEMENT_IS_NOT_A_BATCH = "Statement cannot be executed as batch statement: ";
public static final String UNABLE_PREPARE_STATEMENT = "Cannot prepare statement: ";
public static final String MULTI_TYPES_IN_ONE_QUERY = "Query cannot contain expressions with different types: ";
public static final String SCAN_QUERY_INSIDE_TRANSACTION = "Scan query cannot be executed inside active "
+ "transaction. This behavior may be changed by property scanQueryTxMode";
Expand Down
120 changes: 14 additions & 106 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbTracer.java
Original file line number Diff line number Diff line change
@@ -1,126 +1,34 @@
package tech.ydb.jdbc;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import tech.ydb.jdbc.impl.YdbTracerImpl;



/**
*
* @author Aleksandr Gorshenin
*/
public class YdbTracer {
private static final Logger LOGGER = Logger.getLogger(YdbTracer.class.getName());
private static final ThreadLocal<YdbTracer> LOCAL = new ThreadLocal<>();
private static final AtomicLong ANONYMOUS_COUNTER = new AtomicLong(0);

private final Date startDate = new Date();
private final long startedAt = System.currentTimeMillis();
private final List<Record> records = new ArrayList<>();

private String txID = null;
private String label = null;
private boolean isMarked = false;
private boolean isClosed = false;

private class Record {
private final long executedAt = System.currentTimeMillis();
private final String message;
private final boolean isRequest;

Record(String message, boolean isRequest) {
this.message = message;
this.isRequest = isRequest;
}
public interface YdbTracer {
static void clear() {
YdbTracerImpl.clear();
}

public static void clear() {
LOCAL.remove();
static YdbTracer current() {
return YdbTracerImpl.current();
}

public static YdbTracer current() {
YdbTracer tracer = LOCAL.get();
if (tracer == null || tracer.isClosed) {
tracer = new YdbTracer();
LOCAL.set(tracer);
}

return tracer;
}

public void trace(String message) {
records.add(new Record(message, false));
}

public void traceRequest(String message) {
records.add(new Record(message, true));
}

public void setId(String id) {
if (!Objects.equals(id, txID)) {
this.txID = id;
trace("set-id " + id);
}
}

public void markToPrint() {
if (!isMarked) {
this.isMarked = true;
trace("markToPrint");
}
}

public void markToPrint(String label) {
if (!isMarked) {
this.isMarked = true;
this.label = label;
trace("markToPrint " + label);
}
}
void setId(String id);

public void close() {
isClosed = true;
void trace(String message);

LOCAL.remove();
void query(String queryText);

final Level level = isMarked ? Level.INFO : Level.FINE;
if (!LOGGER.isLoggable(level) || records.isEmpty()) {
return;
}
void markToPrint(String label);

long finishedAt = System.currentTimeMillis();
long requestsTime = 0;
void close();

String id = txID != null ? txID : "anonymous-" + ANONYMOUS_COUNTER.incrementAndGet();
String traceID = label == null ? id : label + "-" + id;
LOGGER.log(level, "Trace[{0}] started at {1}", new Object[] {traceID, startDate});
long last = startedAt;
long requestsCount = 0;
boolean lastIsRequest = false;
for (Record record: records) {
if (record.isRequest) {
requestsCount++;
lastIsRequest = true;
if (record.message != null) {
LOGGER.log(level, "Query[{0}]: {1}", new Object[] {traceID, record.message.replaceAll("\\s", " ")});
}
} else {
long ms = record.executedAt - last;
if (lastIsRequest) {
requestsTime += ms;
lastIsRequest = false;
}
LOGGER.log(level, "Trace[{0}] {1} ms {2}", new Object[] {traceID, ms, record.message});
last = record.executedAt;
}
}
LOGGER.log(level, "Trace[{0}] finished in {1} ms, {2} requests take {3} ms", new Object[] {
traceID, finishedAt - startedAt, requestsCount, requestsTime
});
default void markToPring() {
markToPrint(null);
}
}
54 changes: 17 additions & 37 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,6 @@ public void ensureOpened() throws SQLException {
}
}

@Override
public YdbTracer trace(String message) {
if (!traceEnabled) {
return null;
}
YdbTracer tracer = YdbTracer.current();
tracer.trace(message);
return tracer;
}

protected YdbTracer traceRequest(String type, String message) {
if (!traceEnabled) {
return null;
}
YdbTracer tracer = YdbTracer.current();
tracer.trace("--> " + type);
tracer.traceRequest(message);
return tracer;
}

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
ensureOpened();
Expand All @@ -109,14 +89,16 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
YdbValidator validator = statement.getValidator();

// Scheme query does not affect transactions or result sets
YdbTracer tracer = traceRequest("scheme query", yql);
YdbTracer tracer = ctx.getTracer();
tracer.trace("--> scheme query");
tracer.query(yql);

ExecuteSchemeQuerySettings settings = ctx.withDefaultTimeout(new ExecuteSchemeQuerySettings());
validator.execute(QueryType.SCHEME_QUERY + " >>\n" + yql, tracer,
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
);

if (tracer != null && !isInsideTransaction()) {
if (!isInsideTransaction()) {
tracer.close();
}

Expand All @@ -130,12 +112,15 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,

String yql = prefixPragma + query.getPreparedYql();
YdbValidator validator = statement.getValidator();
YdbTracer tracer = traceRequest("bulk upsert", yql);
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
tracer.trace("--> bulk upsert");
tracer.query(yql);

validator.execute(QueryType.BULK_QUERY + " >>\n" + yql, tracer,
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
);

if (tracer != null && !isInsideTransaction()) {
if (!isInsideTransaction()) {
tracer.close();
}

Expand All @@ -156,10 +141,12 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
.build();
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;

final YdbTracer tracer = traceRequest("scan query", yql);
final Session session = createNewTableSession(validator);
YdbTracer tracer = ctx.getTracer();
tracer.trace("--> scan query");
tracer.query(yql);

StreamQueryResult lazy = validator.call(msg, null, () -> {
final Session session = createNewTableSession(validator);
StreamQueryResult lazy = validator.call(msg, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
Expand All @@ -173,22 +160,15 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
if (th != null) {
result.onStreamFinished(th);
future.completeExceptionally(th);

if (tracer != null) {
tracer.trace("<-- " + th.getMessage());
tracer.close();
}
tracer.trace("<-- " + th.getMessage());
}
if (st != null) {
validator.addStatusIssues(st);
result.onStreamFinished(st);
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));

if (tracer != null) {
tracer.trace("<-- " + st.toString());
tracer.close();
}
tracer.trace("<-- " + st.toString());
}
tracer.close();
});

return future;
Expand Down
Loading

0 comments on commit 7a124f7

Please sign in to comment.