From 73cab124c6d4e1024e388d5d2e3d3e1975fd0c9a Mon Sep 17 00:00:00 2001 From: Alexander Wert Date: Sat, 27 May 2023 19:09:46 +0200 Subject: [PATCH] Added native instrumentation using OpenTelemetry API Signed-off-by: Alexander Wert --- java-client/build.gradle.kts | 39 ++- .../elastic/clients/transport/Endpoint.java | 14 + .../transport/endpoints/BinaryEndpoint.java | 10 +- .../transport/endpoints/BooleanEndpoint.java | 5 +- .../endpoints/DelegatingJsonEndpoint.java | 10 + .../transport/endpoints/EndpointBase.java | 18 ++ .../transport/endpoints/SimpleEndpoint.java | 10 +- .../endpoints/SimpleJsonEndpoint.java | 7 +- .../rest_client/InstrumentationUtil.java | 131 ++++++++++ .../rest_client/RestClientTransport.java | 144 +++++++---- .../experiments/api/FooOptRequest.java | 2 + .../experiments/api/FooRequest.java | 2 + .../experiments/generics/GenericClass.java | 4 + .../elastic/clients/json/JsonpUtilsTest.java | 6 +- .../rest_client/InstrumentationTest.java | 241 ++++++++++++++++++ 15 files changed, 567 insertions(+), 76 deletions(-) create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest_client/InstrumentationUtil.java create mode 100644 java-client/src/test/java/co/elastic/clients/transport/rest_client/InstrumentationTest.java diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 8a2f9ae91..573791fa2 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -18,8 +18,8 @@ */ import com.github.jk1.license.ProjectData -import com.github.jk1.license.render.ReportRenderer import com.github.jk1.license.render.LicenseDataCollector +import com.github.jk1.license.render.ReportRenderer import java.io.FileWriter plugins { @@ -53,8 +53,8 @@ tasks.getByName("processResources") { if (name != "apis.json") { // Only process main source-set resources (test files are large) expand( - "version" to version, - "git_revision" to (if (rootProject.extra.has("gitHashFull")) rootProject.extra["gitHashFull"] else "unknown") + "version" to version, + "git_revision" to (if (rootProject.extra.has("gitHashFull")) rootProject.extra["gitHashFull"] else "unknown") ) } } @@ -69,7 +69,7 @@ tasks.withType { if (rootProject.extra.has("gitHashFull")) { val jar = this as Jar jar.manifest.attributes["X-Git-Revision"] = rootProject.extra["gitHashFull"] - jar.manifest.attributes["X-Git-Commit-Time"] = rootProject .extra["gitCommitTime"] + jar.manifest.attributes["X-Git-Commit-Time"] = rootProject.extra["gitCommitTime"] } else { throw GradleException("No git information available") } @@ -154,7 +154,7 @@ publishing { // are the same as the one used in the dependency section below. val xPathFactory = javax.xml.xpath.XPathFactory.newInstance() val depSelector = xPathFactory.newXPath() - .compile("/project/dependencies/dependency[groupId/text() = 'org.elasticsearch.client']") + .compile("/project/dependencies/dependency[groupId/text() = 'org.elasticsearch.client']") val versionSelector = xPathFactory.newXPath().compile("version") var foundVersion = false; @@ -183,6 +183,7 @@ dependencies { // the Java API client coexists with a 7.x HLRC work fine val elasticsearchVersion = "7.17.7" val jacksonVersion = "2.13.3" + val openTelemetryVersion = "1.26.0" // Apache 2.0 // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html @@ -201,6 +202,13 @@ dependencies { // https://github.com/eclipse-ee4j/parsson api("org.eclipse.parsson:parsson:1.0.0") + // OpenTelemetry API for native instrumentation of the client. + // Apache 2.0 + // https://github.com/open-telemetry/opentelemetry-java + implementation("io.opentelemetry", "opentelemetry-api", openTelemetryVersion) + implementation("io.opentelemetry", "opentelemetry-semconv", "$openTelemetryVersion-alpha") + + // EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 // https://github.com/eclipse-ee4j/jsonb-api compileOnly("jakarta.json.bind", "jakarta.json.bind-api", "2.0.0") @@ -236,6 +244,9 @@ dependencies { // https://www.testcontainers.org/ testImplementation("org.testcontainers", "testcontainers", "1.17.3") testImplementation("org.testcontainers", "elasticsearch", "1.17.3") + + + testImplementation("io.opentelemetry", "opentelemetry-sdk", openTelemetryVersion) } @@ -247,17 +258,17 @@ licenseReport { class SpdxReporter(val dest: File) : ReportRenderer { // License names to their SPDX identifier val spdxIds = mapOf( - "Apache License, Version 2.0" to "Apache-2.0", - "The Apache Software License, Version 2.0" to "Apache-2.0", - "BSD Zero Clause License" to "0BSD", - "Eclipse Public License 2.0" to "EPL-2.0", - "Eclipse Public License v. 2.0" to "EPL-2.0", - "Eclipse Public License - v 2.0" to "EPL-2.0", - "GNU General Public License, version 2 with the GNU Classpath Exception" to "GPL-2.0 WITH Classpath-exception-2.0", - "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0" to "CDDL-1.0" + "Apache License, Version 2.0" to "Apache-2.0", + "The Apache Software License, Version 2.0" to "Apache-2.0", + "BSD Zero Clause License" to "0BSD", + "Eclipse Public License 2.0" to "EPL-2.0", + "Eclipse Public License v. 2.0" to "EPL-2.0", + "Eclipse Public License - v 2.0" to "EPL-2.0", + "GNU General Public License, version 2 with the GNU Classpath Exception" to "GPL-2.0 WITH Classpath-exception-2.0", + "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0" to "CDDL-1.0" ) - private fun quote(str: String) : String { + private fun quote(str: String): String { return if (str.contains(',') || str.contains("\"")) { "\"" + str.replace("\"", "\"\"") + "\"" } else { diff --git a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java index 2ef0a0989..eaa1759f8 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/Endpoint.java @@ -56,6 +56,18 @@ public interface Endpoint { */ String requestUrl(RequestT request); + /** + * Get the route for a request (i.e. URL pattern). + */ + String route(RequestT request); + + /** + * Get the path parameters for a request. + */ + default Map pathParameters(RequestT request) { + return Collections.emptyMap(); + } + /** * Get the query parameters for a request. */ @@ -104,6 +116,8 @@ default BinaryEndpoint withBinaryResponse() { this.id(), this::method, this::requestUrl, + this::route, + this::pathParameters, this::queryParameters, this::headers, this::body, diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryEndpoint.java index 5a7d52b0f..419184aa6 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryEndpoint.java @@ -28,26 +28,32 @@ public BinaryEndpoint( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, Function body, Object ignored // same number of arguments as SimpleEndpoint ) { - super(id, method, requestUrl, queryParameters, headers, body); + super(id, method, requestUrl, route, pathParameters, queryParameters, headers, body); } public BinaryEndpoint( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, boolean hasRequestBody, Object ignored // same number of arguments as SimpleEndpoint ) { - super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull()); + super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull()); } @Override diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/BooleanEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BooleanEndpoint.java index e9c386c8e..a50ae6ead 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/BooleanEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/BooleanEndpoint.java @@ -28,13 +28,16 @@ public BooleanEndpoint( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, boolean hasRequestBody, Object ignored // same number of arguments as SimpleEndpoint ) { - super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull()); + super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull()); } @Override diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/DelegatingJsonEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/DelegatingJsonEndpoint.java index d79ba91f6..b12b47903 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/DelegatingJsonEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/DelegatingJsonEndpoint.java @@ -48,6 +48,16 @@ public String requestUrl(Req request) { return endpoint.requestUrl(request); } + @Override + public String route(Req request) { + return endpoint.route(request); + } + + @Override + public Map pathParameters(Req request) { + return endpoint.pathParameters(request); + } + @Override public Map queryParameters(Req request) { return endpoint.queryParameters(request); diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java index 62d17b703..f3635807b 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java @@ -64,6 +64,8 @@ static Function returnSelf() { protected final String id; protected final Function method; protected final Function requestUrl; + protected final Function route; + protected final Function> pathParameters; protected final Function> queryParameters; protected final Function> headers; protected final Function body; @@ -72,6 +74,8 @@ public EndpointBase( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, Function body @@ -79,6 +83,8 @@ public EndpointBase( this.id = id; this.method = method; this.requestUrl = requestUrl; + this.route = route; + this.pathParameters = pathParameters; this.queryParameters = queryParameters; this.headers = headers; this.body = body; @@ -99,6 +105,16 @@ public String requestUrl(RequestT request) { return this.requestUrl.apply(request); } + @Override + public String route(RequestT request) { + return this.route.apply(request); + } + + @Override + public Map pathParameters(RequestT request) { + return this.pathParameters.apply(request); + } + @Override public Map queryParameters(RequestT request) { return this.queryParameters.apply(request); @@ -133,6 +149,8 @@ public SimpleEndpoint withResponseDeseria id, method, requestUrl, + route, + pathParameters, queryParameters, headers, body, diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java index 9e8041dcb..28ceff8a5 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java @@ -36,12 +36,14 @@ public SimpleEndpoint( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, Function body, JsonpDeserializer responseParser ) { - super(id, method, requestUrl, queryParameters, headers, body); + super(id, method, requestUrl, route, pathParameters, queryParameters, headers, body); this.responseParser = responseParser; } @@ -49,6 +51,8 @@ public SimpleEndpoint( String id, Function method, Function requestUrl, + Function route, + Function> pathParameters, Function> queryParameters, Function> headers, boolean hasResponseBody, @@ -58,6 +62,8 @@ public SimpleEndpoint( id, method, requestUrl, + route, + pathParameters, queryParameters, headers, hasResponseBody ? returnSelf() : returnNull(), @@ -82,6 +88,8 @@ public SimpleEndpoint withResponseDeseria id, method, requestUrl, + route, + pathParameters, queryParameters, headers, body, diff --git a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java index c97a03675..95d73c222 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java +++ b/java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleJsonEndpoint.java @@ -33,12 +33,15 @@ public SimpleJsonEndpoint( String id, Function method, Function requestUrl, + Function route, Function> queryParameters, + Map> pathParameters, + Function> queryParameters, Function> headers, boolean hasRequestBody, JsonpDeserializer responseParser ) { - super(id, method, requestUrl, queryParameters, headers, hasRequestBody, responseParser); + super(id, method, requestUrl, route, pathParameters, queryParameters, headers, hasRequestBody, responseParser); } } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/InstrumentationUtil.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/InstrumentationUtil.java new file mode 100644 index 000000000..8ce6f8913 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/InstrumentationUtil.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.transport.Endpoint; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.apache.http.HttpHost; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class InstrumentationUtil { + + private final OpenTelemetry openTelemetry; + private final Tracer tracer; + + protected InstrumentationUtil(@Nullable OpenTelemetry openTelemetry) { + if (openTelemetry == null) { + this.openTelemetry = GlobalOpenTelemetry.get(); + } else { + this.openTelemetry = openTelemetry; + } + tracer = this.openTelemetry.getTracer("elasticsearch-api"); + } + + protected Span createSpanForRequest(RequestT request, + Endpoint endpoint) { + String httpMethod = endpoint.method(request); + String route = endpoint.route(request); + + Span span = tracer.spanBuilder(httpMethod + " " + route).setSpanKind(SpanKind.CLIENT).startSpan(); + if (span.isRecording()) { + span.setAttribute(SemanticAttributes.DB_SYSTEM, "elasticsearch"); + span.setAttribute(SemanticAttributes.HTTP_METHOD, endpoint.method(request)); + span.setAttribute("url.path", endpoint.requestUrl(request)); + + Map queryParameters = endpoint.queryParameters(request); + if (!queryParameters.isEmpty()) { + String queryString = + queryParameters.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("&")); + span.setAttribute("url.query", queryString); + } + Map pathParameters = endpoint.pathParameters(request); + if (pathParameters.containsKey("index")) { + String indexValue = pathParameters.get("index"); + span.setAttribute("db.elasticsearch.target", indexValue); + } + + if (pathParameters.containsKey("id") && route.startsWith("/{index}/_") && route.endsWith("/{id}")) { + String docId = pathParameters.get("id"); + span.setAttribute("db.elasticsearch.doc_id", docId); + } + } + + return span; + } + + protected void captureHostInformation(@Nullable Span span, HttpHost host) { + if(span == null){ + return; + } + span.setAttribute("server.address", host.getHostName()); + span.setAttribute("server.port", host.getPort()); + span.setAttribute("url.scheme", host.getSchemeName()); + } + + protected void captureBody(@Nullable Span span, RequestT request, Endpoint endpoint, + List lines) { + if (shouldCaptureBody(span, request, endpoint)) { + StringBuilder bodyString = new StringBuilder(); + for (ByteBuffer line : lines) { + bodyString.append(StandardCharsets.UTF_8.decode(line)); + bodyString.append("\n"); + } + + span.setAttribute(SemanticAttributes.DB_STATEMENT, bodyString.toString()); + } + } + + protected void captureBody(@Nullable Span span, RequestT request, Endpoint endpoint, + ByteArrayOutputStream baos) { + if (shouldCaptureBody(span, request, endpoint)) { + span.setAttribute(SemanticAttributes.DB_STATEMENT, baos.toString()); + } + } + + private boolean shouldCaptureBody(@Nullable Span span, RequestT request, Endpoint endpoint) { + if (span == null || !span.isRecording()) { + return false; + } + + String route = endpoint.route(request); + + // We capture the request body in the span only for search-type requests. + return route.contains("/_search") || + route.contains("/_msearch") || + route.contains("/_async_search") || + route.equals("/{index}/_terms_enum") || + route.startsWith("/_render/template") || + route.equals("/{index}/_mvt/{field}/{zoom}/{x}/{y}"); + } + + +} diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java index 255cd8e9f..abaee6726 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java @@ -24,18 +24,22 @@ import co.elastic.clients.json.JsonpDeserializer; import co.elastic.clients.json.JsonpMapper; import co.elastic.clients.json.NdJsonpSerializable; +import co.elastic.clients.transport.ElasticsearchTransport; +import co.elastic.clients.transport.Endpoint; import co.elastic.clients.transport.JsonEndpoint; import co.elastic.clients.transport.TransportException; +import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.transport.Version; import co.elastic.clients.transport.endpoints.BinaryEndpoint; import co.elastic.clients.transport.endpoints.BooleanEndpoint; import co.elastic.clients.transport.endpoints.BooleanResponse; -import co.elastic.clients.transport.ElasticsearchTransport; -import co.elastic.clients.transport.Endpoint; -import co.elastic.clients.transport.TransportOptions; import co.elastic.clients.util.ApiTypeHelper; import co.elastic.clients.util.BinaryData; import co.elastic.clients.util.MissingRequiredPropertyException; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import jakarta.json.stream.JsonGenerator; import jakarta.json.stream.JsonParser; import org.apache.http.HttpEntity; @@ -77,8 +81,8 @@ public class RestClientTransport implements ElasticsearchTransport { JsonContentType = ContentType.APPLICATION_JSON; } else { JsonContentType = ContentType.create( - "application/vnd.elasticsearch+json", - new BasicNameValuePair("compatible-with", String.valueOf(Version.VERSION.major())) + "application/vnd.elasticsearch+json", + new BasicNameValuePair("compatible-with", String.valueOf(Version.VERSION.major())) ); } } @@ -103,15 +107,26 @@ public boolean cancel(boolean mayInterruptIfRunning) { private final RestClient restClient; private final JsonpMapper mapper; private final RestClientOptions transportOptions; + private final InstrumentationUtil instrumentationUtil; - public RestClientTransport(RestClient restClient, JsonpMapper mapper, @Nullable TransportOptions options) { + public RestClientTransport(RestClient restClient, JsonpMapper mapper, @Nullable OpenTelemetry openTelemetry, + @Nullable TransportOptions options) { this.restClient = restClient; this.mapper = mapper; this.transportOptions = options == null ? RestClientOptions.initialOptions() : RestClientOptions.of(options); + this.instrumentationUtil = new InstrumentationUtil(openTelemetry); + } + + public RestClientTransport(RestClient restClient, JsonpMapper mapper, @Nullable OpenTelemetry openTelemetry) { + this(restClient, mapper, openTelemetry, null); + } + + public RestClientTransport(RestClient restClient, JsonpMapper mapper, @Nullable TransportOptions options) { + this(restClient, mapper, null, options); } public RestClientTransport(RestClient restClient, JsonpMapper mapper) { - this(restClient, mapper, null); + this(restClient, mapper, null, null); } /** @@ -144,27 +159,42 @@ public void close() throws IOException { } public ResponseT performRequest( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options ) throws IOException { - - org.elasticsearch.client.Request clientReq = prepareLowLevelRequest(request, endpoint, options); - org.elasticsearch.client.Response clientResp = restClient.performRequest(clientReq); - return getHighLevelResponse(clientResp, endpoint); + Span span = instrumentationUtil.createSpanForRequest(request, endpoint); + + try (Scope ss = span.makeCurrent()) { + org.elasticsearch.client.Request clientReq = prepareLowLevelRequest(request, endpoint, options, span); + org.elasticsearch.client.Response clientResp = restClient.performRequest(clientReq); + instrumentationUtil.captureHostInformation(span, clientResp.getHost()); + return getHighLevelResponse(clientResp, endpoint); + } catch (Throwable throwable) { + span.setStatus(StatusCode.ERROR, throwable.getMessage()); + span.recordException(throwable); + throw throwable; + } finally { + span.end(); + } } public CompletableFuture performRequestAsync( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options ) { + Span span = instrumentationUtil.createSpanForRequest(request, endpoint); + RequestFuture future = new RequestFuture<>(); org.elasticsearch.client.Request clientReq; - try { - clientReq = prepareLowLevelRequest(request, endpoint, options); + try (Scope ss = span.makeCurrent()) { + clientReq = prepareLowLevelRequest(request, endpoint, options, span); } catch (Exception e) { // Terminate early + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); + span.end(); future.completeExceptionally(e); return future; } @@ -176,18 +206,24 @@ public CompletableFuture performRequest @Override public void onSuccess(Response clientResp) { try (ApiTypeHelper.DisabledChecksHandle h = - ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) { - + ApiTypeHelper.DANGEROUS_disableRequiredPropertiesCheck(disableRequiredChecks)) { + instrumentationUtil.captureHostInformation(span, clientResp.getHost()); ResponseT response = getHighLevelResponse(clientResp, endpoint); future.complete(response); - } catch (Exception e) { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); future.completeExceptionally(e); + } finally { + span.end(); } } @Override public void onFailure(Exception e) { + span.setStatus(StatusCode.ERROR, e.getMessage()); + span.recordException(e); + span.end(); future.completeExceptionally(e); } }); @@ -196,9 +232,10 @@ public void onFailure(Exception e) { } private org.elasticsearch.client.Request prepareLowLevelRequest( - RequestT request, - Endpoint endpoint, - @Nullable TransportOptions options + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options, + @Nullable Span span ) throws IOException { String method = endpoint.method(request); String path = endpoint.requestUrl(request); @@ -207,8 +244,8 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( org.elasticsearch.client.Request clientReq = new org.elasticsearch.client.Request(method, path); RequestOptions restOptions = options == null ? - transportOptions.restClientRequestOptions() : - RestClientOptions.of(options).restClientRequestOptions(); + transportOptions.restClientRequestOptions() : + RestClientOptions.of(options).restClientRequestOptions(); if (restOptions != null) { clientReq.setOptions(restOptions); @@ -223,9 +260,9 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( List lines = new ArrayList<>(); collectNdJsonLines(lines, (NdJsonpSerializable) request); clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType)); - + instrumentationUtil.captureBody(span, request, endpoint, lines); } else if (body instanceof BinaryData) { - BinaryData data = (BinaryData)body; + BinaryData data = (BinaryData) body; // ES expects the Accept and Content-Type headers to be consistent. ContentType contentType; @@ -238,8 +275,8 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( } clientReq.setEntity(new MultiBufferEntity( - Collections.singletonList(data.asByteBuffer()), - contentType + Collections.singletonList(data.asByteBuffer()), + contentType )); } else { @@ -248,6 +285,7 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( mapper.serialize(body, generator); generator.close(); clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); + instrumentationUtil.captureBody(span, request, endpoint, baos); } } @@ -260,12 +298,12 @@ private org.elasticsearch.client.Request prepareLowLevelRequest( private void collectNdJsonLines(List lines, NdJsonpSerializable value) { Iterator values = value._serializables(); - while(values.hasNext()) { + while (values.hasNext()) { Object item = values.next(); if (item == null) { // Skip } else if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself - collectNdJsonLines(lines, (NdJsonpSerializable)item); + collectNdJsonLines(lines, (NdJsonpSerializable) item); } else { // TODO: items that aren't already BinaryData could be serialized to ByteBuffers lazily // to reduce the number of buffers to keep in memory @@ -281,7 +319,7 @@ private void collectNdJsonLines(List lines, NdJsonpSerializable valu */ private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) throws IOException { Iterator values = value._serializables(); - while(values.hasNext()) { + while (values.hasNext()) { Object item = values.next(); if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself writeNdJson((NdJsonpSerializable) item, baos); @@ -295,8 +333,8 @@ private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) } private ResponseT getHighLevelResponse( - org.elasticsearch.client.Response clientResp, - Endpoint endpoint + org.elasticsearch.client.Response clientResp, + Endpoint endpoint ) throws IOException { int statusCode = clientResp.getStatusLine().getStatusCode(); @@ -310,16 +348,16 @@ private ResponseT getHighLevelResponse( JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); if (errorDeserializer == null) { throw new TransportException( - "Request failed with status code '" + statusCode + "'", - endpoint.id(), new ResponseException(clientResp) + "Request failed with status code '" + statusCode + "'", + endpoint.id(), new ResponseException(clientResp) ); } HttpEntity entity = clientResp.getEntity(); if (entity == null) { throw new TransportException( - "Expecting a response body, but none was sent", - endpoint.id(), new ResponseException(clientResp) + "Expecting a response body, but none was sent", + endpoint.id(), new ResponseException(clientResp) ); } @@ -333,12 +371,12 @@ private ResponseT getHighLevelResponse( // TODO: have the endpoint provide the exception constructor throw new ElasticsearchException(endpoint.id(), (ErrorResponse) error); } - } catch(MissingRequiredPropertyException errorEx) { + } catch (MissingRequiredPropertyException errorEx) { // Could not decode exception, try the response type try { ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); return response; - } catch(Exception respEx) { + } catch (Exception respEx) { // No better luck: throw the original error decoding exception throw new TransportException("Failed to decode error response", endpoint.id(), new ResponseException(clientResp)); } @@ -355,7 +393,7 @@ private ResponseT getHighLevelResponse( } private ResponseT decodeResponse( - int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint + int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint ) throws IOException { if (endpoint instanceof JsonEndpoint) { @@ -368,8 +406,8 @@ private ResponseT decodeResponse( // Expecting a body if (entity == null) { throw new TransportException( - "Expecting a response body, but none was sent", - endpoint.id(), new ResponseException(clientResp) + "Expecting a response body, but none was sent", + endpoint.id(), new ResponseException(clientResp) ); } InputStream content = entity.getContent(); @@ -379,7 +417,7 @@ private ResponseT decodeResponse( } return response; - } else if(endpoint instanceof BooleanEndpoint) { + } else if (endpoint instanceof BooleanEndpoint) { BooleanEndpoint bep = (BooleanEndpoint) endpoint; @SuppressWarnings("unchecked") @@ -401,7 +439,7 @@ private ResponseT decodeResponse( // Endpoints that (incorrectly) do not return the Elastic product header private static final Set endpointsMissingProductHeader = new HashSet<>(Arrays.asList( - "es/snapshot.create" // #74 / elastic/elasticsearch#82358 + "es/snapshot.create" // #74 / elastic/elasticsearch#82358 )); private void checkProductHeader(Response clientResp, Endpoint endpoint) throws IOException { @@ -411,17 +449,17 @@ private void checkProductHeader(Response clientResp, Endpoint endpoint) return; } throw new TransportException( - "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " - + "instance, and that any networking filters are preserving that header.", - endpoint.id(), - new ResponseException(clientResp) + "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " + + "instance, and that any networking filters are preserving that header.", + endpoint.id(), + new ResponseException(clientResp) ); } if (!"Elasticsearch".equals(header)) { throw new TransportException("Invalid value '" + header + "' for 'X-Elastic-Product' header.", - endpoint.id(), - new ResponseException(clientResp) + endpoint.id(), + new ResponseException(clientResp) ); } } diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooOptRequest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooOptRequest.java index bfad18282..2f246100e 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooOptRequest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooOptRequest.java @@ -262,6 +262,8 @@ public FooOptRequest build() { "foo", r -> "POST", r -> "/foo", + r -> "/foo", + SimpleEndpoint.emptyMap(), SimpleEndpoint.emptyMap(), SimpleEndpoint.emptyMap(), true, diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooRequest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooRequest.java index 4e1d894b1..64c060d5c 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooRequest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/api/FooRequest.java @@ -302,6 +302,8 @@ public static JsonpDeserializer parser() { "foo", r -> "POST", r -> "/foo", + r -> "/foo", + SimpleEndpoint.emptyMap(), SimpleEndpoint.emptyMap(), SimpleEndpoint.emptyMap(), true, diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/generics/GenericClass.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/generics/GenericClass.java index 0ee0418b3..0be527e57 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/generics/GenericClass.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/experiments/generics/GenericClass.java @@ -111,6 +111,10 @@ public static Endpoint, ErrorResponse // Request path request -> "/genclass", + // Route + request -> "/genclass", + SimpleEndpoint.emptyMap(), + // Request parameters SimpleEndpoint.emptyMap(), SimpleEndpoint.emptyMap(), diff --git a/java-client/src/test/java/co/elastic/clients/json/JsonpUtilsTest.java b/java-client/src/test/java/co/elastic/clients/json/JsonpUtilsTest.java index c7db13643..2f82a7e35 100644 --- a/java-client/src/test/java/co/elastic/clients/json/JsonpUtilsTest.java +++ b/java-client/src/test/java/co/elastic/clients/json/JsonpUtilsTest.java @@ -163,7 +163,7 @@ public void testJsonString() { IndicesPrivileges priv = IndicesPrivileges.of(i -> i .names("bar") .query(q -> q.term(t -> t.field("baz").value(1))) - .privileges(IndexPrivilege.All) + .privileges(IndexPrivilege.All.jsonValue()) ); String json = "{\"names\":[\"bar\"],\"privileges\":[\"all\"],\"query\":\"{\\\"term\\\":{\\\"baz\\\":{\\\"value\\\":1}}}\"}"; @@ -179,7 +179,7 @@ public void testJsonString() { IndicesPrivileges priv = IndicesPrivileges.of(i -> i .names("bar") .query(q -> q._custom("template", RoleTemplateScript.of(s -> s.stored(v -> v.id("foo"))))) - .privileges(IndexPrivilege.All) + .privileges(IndexPrivilege.All.jsonValue()) ); String json = "{\"names\":[\"bar\"],\"privileges\":[\"all\"],\"query\":\"{\\\"template\\\":{\\\"id\\\":\\\"foo\\\"}}\"}"; @@ -195,7 +195,7 @@ public void testJsonString() { UserIndicesPrivileges priv = UserIndicesPrivileges.of(i -> i .names("bar") .query(q -> q.term(t -> t.field("baz").value(1))) - .privileges(IndexPrivilege.All) + .privileges(IndexPrivilege.All.jsonValue()) .allowRestrictedIndices(false) ); diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/InstrumentationTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/InstrumentationTest.java new file mode 100644 index 000000000..3d6f8d20e --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/InstrumentationTest.java @@ -0,0 +1,241 @@ +package co.elastic.clients.transport.rest_client; + +import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient; +import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._types.query_dsl.Query; +import co.elastic.clients.json.jackson.JacksonJsonpMapper; +import com.sun.net.httpserver.HttpServer; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class InstrumentationTest { + private static final String INDEX = "test-index"; + private static final String DOC_ID = "1234567"; + private static final String DOC_RESPONSE = "{\n" + + " \"_index\": \"" + INDEX + "\",\n" + + " \"_id\": \"" + DOC_ID + "\",\n" + + " \"_version\": 1,\n" + + " \"_seq_no\": 0,\n" + + " \"_primary_term\": 1,\n" + + " \"found\": true,\n" + + " \"_source\": {\n" + + " \"@timestamp\": \"2099-11-15T14:12:12\",\n" + + " \"message\": \"GET /search HTTP/1.1 200 1070000\"\n" + + " }\n" + + "}"; + private static final String SEARCH_RESPONSE = "{\n" + + " \"took\": 5,\n" + + " \"timed_out\": false,\n" + + " \"_shards\": {\n" + + " \"total\": 1,\n" + + " \"successful\": 1,\n" + + " \"skipped\": 0,\n" + + " \"failed\": 0\n" + + " },\n" + + " \"hits\": {\n" + + " \"total\": {\n" + + " \"value\": 1,\n" + + " \"relation\": \"eq\"\n" + + " },\n" + + " \"max_score\": 1.3862942,\n" + + " \"hits\": [\n" + + " {\n" + + " \"_index\": \"" + INDEX + "\",\n" + + " \"_id\": \"" + DOC_ID + "\",\n" + + " \"_score\": 1.3862942,\n" + + " \"_source\": {\n" + + " \"@timestamp\": \"2099-11-15T14:12:12\",\n" + + " \"message\": \"GET /search HTTP/1.1 200 1070000\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + public static final String DB_ELASTICSEARCH_TARGET = "db.elasticsearch.target"; + public static final String DB_ELASTICSEARCH_DOC_ID = "db.elasticsearch.doc_id"; + public static final String URL_PATH = "url.path"; + public static final String URL_SCHEME = "url.scheme"; + public static final String SERVER_ADDRESS = "server.address"; + public static final String SERVER_PORT = "server.port"; + private static HttpServer httpServer; + private static MockSpanExporter spanExporter; + private static OpenTelemetry openTelemetry; + private static RestClientTransport transport; + private static ElasticsearchClient client; + private static ElasticsearchAsyncClient asyncClient; + + @BeforeAll + public static void setup() throws IOException { + setupOTel(); + setupHttpServer(); + setupClient(); + } + + @AfterAll + public static void cleanUp() throws IOException { + httpServer.stop(0); + transport.close(); + } + + private static void setupClient() { + RestClient restClient = + RestClient.builder(new HttpHost(httpServer.getAddress().getAddress(), httpServer.getAddress().getPort())).build(); + + transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); + + client = new ElasticsearchClient(transport); + asyncClient = new ElasticsearchAsyncClient(transport); + } + + private static void setupHttpServer() throws IOException { + httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + + // handler for GetRequest + httpServer.createContext("/" + INDEX + "/_doc/" + DOC_ID, exchange -> { + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + exchange.sendResponseHeaders(200, 0); + exchange.getResponseBody().write(DOC_RESPONSE.getBytes()); + exchange.close(); + }); + + // handler for SearchRequest + httpServer.createContext("/" + INDEX + "/_search", exchange -> { + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + exchange.sendResponseHeaders(200, 0); + exchange.getResponseBody().write(SEARCH_RESPONSE.getBytes()); + exchange.close(); + }); + + httpServer.start(); + } + + private static void setupOTel() { + Resource resource = Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "es-api-test"))); + + spanExporter = new MockSpanExporter(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .setResource(resource) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .buildAndRegisterGlobal(); + } + + @BeforeEach + public void reset() { + spanExporter.reset(); + } + + @Test + public void testGetRequest() throws IOException, InterruptedException { + client.get(r -> r.index(INDEX).id(DOC_ID), Object.class); + Assertions.assertEquals(spanExporter.getSpans().size(), 1); + SpanData span = spanExporter.getSpans().get(0); + Assertions.assertEquals("GET /{index}/_doc/{id}", span.getName()); + Assertions.assertEquals(INDEX, span.getAttributes().get(AttributeKey.stringKey(DB_ELASTICSEARCH_TARGET))); + Assertions.assertEquals(DOC_ID, span.getAttributes().get(AttributeKey.stringKey(DB_ELASTICSEARCH_DOC_ID))); + Assertions.assertEquals("GET", span.getAttributes().get(SemanticAttributes.HTTP_METHOD)); + Assertions.assertEquals("elasticsearch", span.getAttributes().get(SemanticAttributes.DB_SYSTEM)); + Assertions.assertEquals("/" + INDEX + "/_doc/" + DOC_ID, span.getAttributes().get(AttributeKey.stringKey(URL_PATH))); + Assertions.assertEquals("http", span.getAttributes().get(AttributeKey.stringKey(URL_SCHEME))); + Assertions.assertEquals(httpServer.getAddress().getHostName(), span.getAttributes().get(AttributeKey.stringKey(SERVER_ADDRESS))); + Assertions.assertEquals(httpServer.getAddress().getPort(), span.getAttributes().get(AttributeKey.longKey(SERVER_PORT))); + } + + @Test + public void testSearchRequest() throws IOException, InterruptedException { + Query query = Query.of(q -> q.term(t -> t.field("x").value("y"))); + String queryAsString = query.toString().replace("Query: ", "{\"query\":") + "}"; + client.search(r -> r.index(INDEX).query(query), Object.class); + Assertions.assertEquals(spanExporter.getSpans().size(), 1); + SpanData span = spanExporter.getSpans().get(0); + Assertions.assertEquals("POST /{index}/_search", span.getName()); + Assertions.assertEquals(INDEX, span.getAttributes().get(AttributeKey.stringKey(DB_ELASTICSEARCH_TARGET))); + Assertions.assertEquals(queryAsString, span.getAttributes().get(SemanticAttributes.DB_STATEMENT)); + } + + @Test + public void testAsyncSearchRequest() throws IOException, InterruptedException, TimeoutException, ExecutionException { + Query query = Query.of(q -> q.term(t -> t.field("x").value("y"))); + String queryAsString = query.toString().replace("Query: ", "{\"query\":") + "}"; + Future future = asyncClient.search(r -> r.index(INDEX).query(query), Object.class); + future.get(2, TimeUnit.SECONDS); + spanExporter.awaitNumSpans(1, 2000); + Assertions.assertEquals(spanExporter.getSpans().size(), 1); + SpanData span = spanExporter.getSpans().get(0); + Assertions.assertEquals("POST /{index}/_search", span.getName()); + Assertions.assertEquals(INDEX, span.getAttributes().get(AttributeKey.stringKey(DB_ELASTICSEARCH_TARGET))); + Assertions.assertEquals(queryAsString, span.getAttributes().get(SemanticAttributes.DB_STATEMENT)); + } + + private static class MockSpanExporter implements SpanExporter { + + private final List spans = new ArrayList(); + + @Override + public CompletableResultCode export(Collection spans) { + this.spans.addAll(spans); + synchronized (this) { + notifyAll(); + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + spans.clear(); + return CompletableResultCode.ofSuccess(); + } + + public List getSpans() { + return spans; + } + + public void reset() { + spans.clear(); + } + + public synchronized void awaitNumSpans(int num, long timeoutMillis) throws InterruptedException { + while(spans.size() < num){ + wait(timeoutMillis); + } + } + } +}