Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: async support for objects batcher (refactor) #333

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/main/java/io/weaviate/client/base/util/Futures.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.weaviate.client.base.util;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

public class Futures {

private Futures() {
}

public static <T> CompletableFuture<T> supplyDelayed(Supplier<CompletableFuture<T>> supplier, long millis,
Executor executor) throws InterruptedException {
if (executor instanceof ScheduledExecutorService) {
return CompletableFuture.supplyAsync(
supplier,
command -> ((ScheduledExecutorService) executor).schedule(command, millis, TimeUnit.MILLISECONDS)
).thenCompose(f -> f);
}
Thread.sleep(millis);
return supplier.get();
}

public static <T, U> CompletableFuture<U> thenComposeAsync(CompletableFuture<T> future, Function<T, CompletableFuture<U>> callback,
Executor executor) {
if (executor != null) {
return future.thenComposeAsync(callback, executor);
}
return future.thenComposeAsync(callback);
}

public static <T> CompletableFuture<T> handleAsync(CompletableFuture<T> future, BiFunction<T, Throwable, CompletableFuture<T>> callback,
Executor executor) {
if (executor != null) {
return future.handleAsync(callback, executor).thenCompose(f -> f);
}
return future.handleAsync(callback).thenCompose(f -> f);
}

public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier, Executor executor) {
if (executor != null) {
return CompletableFuture.supplyAsync(supplier, executor);
}
return CompletableFuture.supplyAsync(supplier);
}
}
14 changes: 12 additions & 2 deletions src/main/java/io/weaviate/client/v1/async/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import lombok.RequiredArgsConstructor;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

import java.util.concurrent.Executor;

@RequiredArgsConstructor
public class Backup {

Expand All @@ -18,15 +20,23 @@ public class Backup {


public BackupCreator creator() {
return new BackupCreator(client, config, createStatusGetter());
return creator(null);
}

public BackupCreator creator(Executor executor) {
return new BackupCreator(client, config, createStatusGetter(), executor);
}

public BackupCreateStatusGetter createStatusGetter() {
return new BackupCreateStatusGetter(client, config);
}

public BackupRestorer restorer() {
return new BackupRestorer(client, config, restoreStatusGetter());
return restorer(null);
}

public BackupRestorer restorer(Executor executor) {
return new BackupRestorer(client, config, restoreStatusGetter(), executor);
}

public BackupRestoreStatusGetter restoreStatusGetter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.weaviate.client.base.WeaviateError;
import io.weaviate.client.base.WeaviateErrorMessage;
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.util.Futures;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.backup.model.BackupCreateResponse;
import io.weaviate.client.v1.backup.model.BackupCreateStatusResponse;
Expand All @@ -22,6 +23,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

public class BackupCreator extends AsyncBaseClient<BackupCreateResponse>
Expand All @@ -36,11 +38,13 @@ public class BackupCreator extends AsyncBaseClient<BackupCreateResponse>
private String backupId;
private BackupCreateConfig config;
private boolean waitForCompletion;
private final Executor executor;


public BackupCreator(CloseableHttpAsyncClient client, Config config, BackupCreateStatusGetter statusGetter) {
public BackupCreator(CloseableHttpAsyncClient client, Config config, BackupCreateStatusGetter statusGetter, Executor executor) {
super(client, config);
this.statusGetter = statusGetter;
this.executor = executor;
}


Expand Down Expand Up @@ -158,7 +162,7 @@ public void cancelled() {

private CompletableFuture<Result<BackupCreateResponse>> getStatusRecursively(String backend, String backupId,
Result<BackupCreateResponse> createResult) {
return getStatus(backend, backupId).thenCompose(createStatusResult -> {
return Futures.thenComposeAsync(getStatus(backend, backupId), createStatusResult -> {
boolean isRunning = Optional.of(createStatusResult)
.filter(r -> !r.hasErrors())
.map(Result::getResult)
Expand All @@ -176,14 +180,13 @@ private CompletableFuture<Result<BackupCreateResponse>> getStatusRecursively(Str

if (isRunning) {
try {
Thread.sleep(WAIT_INTERVAL);
return getStatusRecursively(backend, backupId, createResult);
return Futures.supplyDelayed(() -> getStatusRecursively(backend, backupId, createResult), WAIT_INTERVAL, executor);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}
return CompletableFuture.completedFuture(merge(createStatusResult, createResult));
});
}, executor);
}

private Result<BackupCreateResponse> merge(Result<BackupCreateStatusResponse> createStatusResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.weaviate.client.base.WeaviateError;
import io.weaviate.client.base.WeaviateErrorMessage;
import io.weaviate.client.base.WeaviateErrorResponse;
import io.weaviate.client.base.util.Futures;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.backup.model.BackupRestoreResponse;
import io.weaviate.client.v1.backup.model.BackupRestoreStatusResponse;
Expand All @@ -22,6 +23,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

public class BackupRestorer extends AsyncBaseClient<BackupRestoreResponse>
Expand All @@ -36,11 +38,13 @@ public class BackupRestorer extends AsyncBaseClient<BackupRestoreResponse>
private String backupId;
private BackupRestoreConfig config;
private boolean waitForCompletion;
private final Executor executor;


public BackupRestorer(CloseableHttpAsyncClient client, Config config, BackupRestoreStatusGetter statusGetter) {
public BackupRestorer(CloseableHttpAsyncClient client, Config config, BackupRestoreStatusGetter statusGetter, Executor executor) {
super(client, config);
this.statusGetter = statusGetter;
this.executor = executor;
}


Expand Down Expand Up @@ -158,7 +162,7 @@ public void cancelled() {

private CompletableFuture<Result<BackupRestoreResponse>> getStatusRecursively(String backend, String backupId,
Result<BackupRestoreResponse> restoreResult) {
return getStatus(backend, backupId).thenCompose(restoreStatusResult -> {
return Futures.thenComposeAsync(getStatus(backend, backupId), restoreStatusResult -> {
boolean isRunning = Optional.of(restoreStatusResult)
.filter(r -> !r.hasErrors())
.map(Result::getResult)
Expand All @@ -176,14 +180,13 @@ private CompletableFuture<Result<BackupRestoreResponse>> getStatusRecursively(St

if (isRunning) {
try {
Thread.sleep(WAIT_INTERVAL);
return getStatusRecursively(backend, backupId, restoreResult);
return Futures.supplyDelayed(() -> getStatusRecursively(backend, backupId, restoreResult), WAIT_INTERVAL, executor);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}
return CompletableFuture.completedFuture(merge(restoreStatusResult, restoreResult));
});
}, executor);
}

private Result<BackupRestoreResponse> merge(Result<BackupRestoreStatusResponse> restoreStatusResult,
Expand Down
Loading