Skip to content

Commit

Permalink
[fix][broker] Use ForkJoinPool.commonPool to handle Metadata operations
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Apr 19, 2024
1 parent 97153dc commit 76c1b4e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,27 +542,31 @@ public void invalidateCaches(String...paths) {
}

/**
* Run the task in the executor thread and fail the future if the executor is shutting down.
* Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally.
*/
@VisibleForTesting
public void execute(Runnable task, CompletableFuture<?> future) {
try {
executor.execute(task);
} catch (Throwable t) {
future.completeExceptionally(t);
}
CompletableFuture.runAsync(task).exceptionally(e -> {
if (!future.isDone()) {
future.completeExceptionally(e);
}
return null;
});
}

/**
* Run the task in the executor thread and fail the future if the executor is shutting down.
* Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally.
*/
@VisibleForTesting
public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) {
try {
executor.execute(task);
} catch (final Throwable t) {
futures.get().forEach(f -> f.completeExceptionally(t));
}
CompletableFuture.runAsync(task).exceptionally(e -> {
futures.get().forEach(f -> {
if (!f.isDone()) {
f.completeExceptionally(e);
}
});
return null;
});
}

protected static String parent(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,12 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e
@Cleanup
ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config);

String forkJoinPoolNamePrefix = "ForkJoinPool.commonPool";
final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s",
metadataStoreName, currentThreadName);
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName), errorMessage);
String errorMessage = String.format("Expect to switch to thread %s*, but currently it is thread %s",
forkJoinPoolNamePrefix, currentThreadName);
assertTrue(currentThreadName.startsWith(forkJoinPoolNamePrefix), errorMessage);
};

// put with node which has parent(but the parent node is not exists).
Expand Down

0 comments on commit 76c1b4e

Please sign in to comment.