Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async client PoC #317

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/java/io/weaviate/client/WeaviateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.weaviate.client.base.util.DbVersionProvider;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.base.util.GrpcVersionSupport;
import io.weaviate.client.v1.async.WeaviateAsyncClient;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.backup.Backup;
import io.weaviate.client.v1.batch.Batch;
Expand Down Expand Up @@ -44,6 +45,10 @@ public WeaviateClient(Config config, HttpClient httpClient, AccessTokenProvider
this.tokenProvider = tokenProvider;
}

public WeaviateAsyncClient async() {
return new WeaviateAsyncClient(config);
}

public Misc misc() {
return new Misc(httpClient, config, dbVersionProvider);
}
Expand Down
75 changes: 75 additions & 0 deletions src/main/java/io/weaviate/client/base/AsyncBaseClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.weaviate.client.base;

import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.ResponseParser;
import io.weaviate.client.base.http.async.WeaviateResponseConsumer;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;

public abstract class AsyncBaseClient<T> {
private final CloseableHttpAsyncClient client;
private final Config config;
private final Serializer serializer;

public AsyncBaseClient(CloseableHttpAsyncClient client, Config config) {
this.client = client;
this.config = config;
this.serializer = new Serializer();
}

protected Future<Result<T>> sendGetRequest(String endpoint, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, null, "GET", classOfT, callback, null);
}

protected Future<Result<T>> sendGetRequest(String endpoint, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, null, "GET", null, callback, parser);
}

protected Future<Result<T>> sendPostRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "POST", classOfT, callback, null);
}

protected Future<Result<T>> sendPostRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "POST", null, callback, parser);
}

protected Future<Result<T>> sendPutRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "PUT", classOfT, callback, null);
}

protected Future<Result<T>> sendPutRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "PUT", null, callback, parser);
}

protected Future<Result<T>> sendDeleteRequest(String endpoint, Object payload, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, payload, "DELETE", classOfT, callback, null);
}

protected Future<Result<T>> sendDeleteRequest(String endpoint, Object payload, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, payload, "DELETE", null, callback, parser);
}

protected Future<Result<T>> sendHeadRequest(String endpoint, Class<T> classOfT, FutureCallback<Result<T>> callback) {
return sendRequest(endpoint, null, "HEAD", classOfT, callback, null);
}

protected Future<Result<T>> sendHeadRequest(String endpoint, FutureCallback<Result<T>> callback, ResponseParser<T> parser) {
return sendRequest(endpoint, null, "HEAD", null, callback, parser);
}

private Future<Result<T>> sendRequest(String endpoint, Object payload, String method, Class<T> classOfT, FutureCallback<Result<T>> callback,
ResponseParser<T> parser) {
SimpleHttpRequest req = new SimpleHttpRequest(method, String.format("%s%s", config.getBaseURL(), endpoint));
req.addHeader(HttpHeaders.ACCEPT, "*/*");
req.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
if (payload != null) {
req.setBody(serializer.toJsonString(payload), ContentType.APPLICATION_JSON);
}
return client.execute(SimpleRequestProducer.create(req), new WeaviateResponseConsumer<>(classOfT, parser), callback);
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/weaviate/client/base/AsyncClientResult.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.weaviate.client.base;

import java.util.concurrent.Future;
import org.apache.hc.core5.concurrent.FutureCallback;

public interface AsyncClientResult<T> {
Future<Result<T>> run();
Future<Result<T>> run(FutureCallback<Result<T>> callback);
}
27 changes: 2 additions & 25 deletions src/main/java/io/weaviate/client/base/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import io.weaviate.client.Config;
import io.weaviate.client.base.http.HttpClient;
import io.weaviate.client.base.http.HttpResponse;
import io.weaviate.client.v1.graphql.GraphQL;
import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import java.util.Collections;
import java.util.List;

public abstract class BaseClient<T> {
private final HttpClient client;
Expand Down Expand Up @@ -53,12 +50,7 @@ private Response<T> sendRequest(String endpoint, Object payload, String method,

if (statusCode < 399) {
T body = toResponse(responseBody, classOfT);
WeaviateErrorResponse errors = null;

if (body != null && classOfT.equals(GraphQL.class)) {
errors = getWeaviateGraphQLErrorResponse((GraphQLResponse) body, statusCode);
}
return new Response<>(statusCode, body, errors);
return new Response<>(statusCode, body, null);
}

WeaviateErrorResponse error = toResponse(responseBody, WeaviateErrorResponse.class);
Expand Down Expand Up @@ -89,7 +81,7 @@ private HttpResponse sendHttpRequest(String address, String json, String method)
}

private <C> C toResponse(String response, Class<C> classOfT) {
return serializer.toResponse(response, classOfT);
return serializer.toObject(response, classOfT);
}

private String toJsonString(Object object) {
Expand All @@ -100,19 +92,4 @@ private WeaviateErrorResponse getWeaviateErrorResponse(Exception e) {
WeaviateErrorMessage error = WeaviateErrorMessage.builder().message(e.getMessage()).throwable(e).build();
return WeaviateErrorResponse.builder().error(Collections.singletonList(error)).build();
}

/**
* Extract errors from {@link WeaviateErrorResponse} from a GraphQL response body.
*
* @param gql GraphQL response body.
* @param code HTTP status code to pass in the {@link WeaviateErrorResponse}.
* @return Error response to be returned to the caller.
*/
private WeaviateErrorResponse getWeaviateGraphQLErrorResponse(GraphQLResponse gql, int code) {
List<WeaviateErrorMessage> messages = gql.errorMessages();
if (messages == null || messages.isEmpty()) {
return null;
}
return WeaviateErrorResponse.builder().code(code).error(gql.errorMessages()).build();
}
}
29 changes: 27 additions & 2 deletions src/main/java/io/weaviate/client/base/Response.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
package io.weaviate.client.base;

import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import java.util.List;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.FieldDefaults;

@Getter
@AllArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class Response<T> {
int statusCode;
T body;
WeaviateErrorResponse errors;

public Response(int statusCode, T body, WeaviateErrorResponse errors) {
this.statusCode = statusCode;
this.body = body;
if (body instanceof GraphQLResponse) {
this.errors = getWeaviateGraphQLErrorResponse((GraphQLResponse) body, statusCode);;
} else {
this.errors = errors;
}
}

/**
* Extract errors from {@link WeaviateErrorResponse} from a GraphQL response body.
*
* @param gql GraphQL response body.
* @param code HTTP status code to pass in the {@link WeaviateErrorResponse}.
* @return Error response to be returned to the caller.
*/
private WeaviateErrorResponse getWeaviateGraphQLErrorResponse(GraphQLResponse gql, int code) {
List<WeaviateErrorMessage> messages = gql.errorMessages();
if (messages == null || messages.isEmpty()) {
return null;
}
return WeaviateErrorResponse.builder().code(code).error(gql.errorMessages()).build();
}
}
21 changes: 20 additions & 1 deletion src/main/java/io/weaviate/client/base/Serializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,30 @@ public Serializer() {
this.gson = new GsonBuilder().disableHtmlEscaping().create();
}

public <C> C toResponse(String response, Class<C> classOfT) {
public <T> T toObject(String response, Class<T> classOfT) {
return gson.fromJson(response, classOfT);
}

public String toJsonString(Object object) {
return (object != null) ? gson.toJson(object) : null;
}

public <T> Result<T> toResult(int statusCode, String body, Class<T> classOfT) {
if (statusCode < 399) {
return new Result<>(toResponse(statusCode, body, classOfT));
}
return new Result<>(statusCode, null, toWeaviateError(body));
}

public <T> Response<T> toResponse(int statusCode, String body, Class<T> classOfT) {
if (statusCode < 399) {
T obj = toObject(body, classOfT);
return new Response<>(statusCode, obj, null);
}
return new Response<>(statusCode, null, toWeaviateError(body));
}

public WeaviateErrorResponse toWeaviateError(String body) {
return toObject(body, WeaviateErrorResponse.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.Config;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;

public class AsyncHttpClient {

public static CloseableHttpAsyncClient create(Config config) {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(config.getSocketTimeout()))
.build();

return HttpAsyncClients.custom()
.setIOReactorConfig(ioReactorConfig)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.base.Result;
import io.weaviate.client.base.Serializer;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;

public abstract class ResponseParser<T> {
protected final Serializer serializer;

public ResponseParser() {
this.serializer = new Serializer();
}

public abstract Result<T> parse(HttpResponse response, String body, ContentType contentType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.weaviate.client.base.http.async;

import io.weaviate.client.base.Result;
import io.weaviate.client.base.Serializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;

public class WeaviateResponseConsumer<T> extends AbstractAsyncResponseConsumer<Result<T>, byte[]> {
private final Serializer serializer;
private final Class<T> classOfT;
private final ResponseParser<T> parser;

public WeaviateResponseConsumer(Class<T> classOfT, ResponseParser<T> parser) {
super(new BasicAsyncEntityConsumer());
this.serializer = new Serializer();
this.classOfT = classOfT;
this.parser = parser;
}

@Override
protected Result<T> buildResult(HttpResponse response, byte[] entity, ContentType contentType) {
String body = new String(entity, StandardCharsets.UTF_8);
if (this.parser != null) {
return this.parser.parse(response, body, contentType);
}
return serializer.toResult(response.getCode(), body, classOfT);
}

@Override
public void informationResponse(HttpResponse response, HttpContext context) throws HttpException, IOException {
}
}
37 changes: 37 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/WeaviateAsyncClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.weaviate.client.v1.async;

import io.weaviate.client.Config;
import io.weaviate.client.base.http.async.AsyncHttpClient;
import io.weaviate.client.v1.async.misc.Misc;
import io.weaviate.client.v1.async.schema.Schema;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.io.CloseMode;

public class WeaviateAsyncClient implements AutoCloseable {
private final Config config;
private final CloseableHttpAsyncClient client;

public WeaviateAsyncClient(Config config) {
this.config = config;
this.client = AsyncHttpClient.create(config);
// auto start the client
this.start();
}

public Misc misc() {
return new Misc(client, config);
}

public Schema schema() {
return new Schema(client, config);
}

private void start() {
this.client.start();
}

@Override
public void close() {
this.client.close(CloseMode.GRACEFUL);
}
}
34 changes: 34 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/misc/Misc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.weaviate.client.v1.async.misc;

import io.weaviate.client.Config;
import io.weaviate.client.v1.async.misc.api.LiveChecker;
import io.weaviate.client.v1.async.misc.api.MetaGetter;
import io.weaviate.client.v1.async.misc.api.OpenIDConfigGetter;
import io.weaviate.client.v1.async.misc.api.ReadyChecker;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

public class Misc {
private final CloseableHttpAsyncClient client;
private final Config config;

public Misc(CloseableHttpAsyncClient client, Config config) {
this.client = client;
this.config = config;
}

public MetaGetter metaGetter() {
return new MetaGetter(client, config);
}

public OpenIDConfigGetter openIDConfigGetter() {
return new OpenIDConfigGetter(client, config);
}

public LiveChecker liveChecker() {
return new LiveChecker(client, config);
}

public ReadyChecker readyChecker() {
return new ReadyChecker(client, config);
}
}
Loading
Loading