From 4a42ede8a8eff0f19be4b6b51a44d3502fc67472 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Sat, 20 Apr 2024 09:52:49 -0700 Subject: [PATCH] vthread --- pom.xml | 4 +- .../metadata/impl/AbstractMetadataStore.java | 38 +++++++++++++------ .../pulsar/metadata/MetadataStoreTest.java | 6 +-- 3 files changed, 32 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 47ac21b62bfed..c3343ee4520de 100644 --- a/pom.xml +++ b/pom.xml @@ -77,8 +77,8 @@ flexible messaging model and an intuitive client API. - 17 - 17 + 21 + 21 ${maven.compiler.target} 8 diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index b0039a3211273..644287cf64d3b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,6 +77,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final AsyncLoadingCache existsCache; private final CopyOnWriteArrayList> metadataCaches = new CopyOnWriteArrayList<>(); private final MetadataStoreStats metadataStoreStats; + private final Thread.Builder vThreadBuilder; // We don't strictly need to use 'volatile' here because we don't need the precise consistent semantic. Instead, // we want to avoid the overhead of 'volatile'. @@ -138,6 +140,12 @@ public CompletableFuture asyncReload(String key, Boolean oldValue, this.metadataStoreName = metadataStoreName; this.metadataStoreStats = new MetadataStoreStats(metadataStoreName); + + Properties systemProperties = System.getProperties(); + systemProperties.put("jdk.virtualThreadScheduler.parallelism", "1"); + systemProperties.put("jdk.virtualThreadScheduler.maxPoolSize", "1"); + systemProperties.put("jdk.virtualThreadScheduler.minRunnable", "1"); + vThreadBuilder = Thread.ofVirtual().name("Metadata-VThread", 0); } @Override @@ -541,16 +549,21 @@ public void invalidateCaches(String...paths) { } } + + /** * Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally. */ @VisibleForTesting public void execute(Runnable task, CompletableFuture future) { - CompletableFuture.runAsync(task).exceptionally(e -> { - if (!future.isDone()) { - future.completeExceptionally(e); + vThreadBuilder.start(() -> { + try { + task.run(); + } catch (Throwable t) { + if (!future.isDone()) { + future.completeExceptionally(t); + } } - return null; }); } @@ -559,13 +572,16 @@ public void execute(Runnable task, CompletableFuture future) { */ @VisibleForTesting public void execute(Runnable task, Supplier>> futures) { - CompletableFuture.runAsync(task).exceptionally(e -> { - futures.get().forEach(f -> { - if (!f.isDone()) { - f.completeExceptionally(e); - } - }); - return null; + vThreadBuilder.start(() -> { + try { + task.run(); + } catch (Throwable t) { + futures.get().forEach(f -> { + if (!f.isDone()) { + f.completeExceptionally(t); + } + }); + } }); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 50d3b924d66d0..12d36f70d6387 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -458,12 +458,12 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e @Cleanup ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); - String forkJoinPoolNamePrefix = "ForkJoinPool.commonPool"; + String poolNamePrefix = "Metadata-VThread"; final Runnable verify = () -> { String currentThreadName = Thread.currentThread().getName(); String errorMessage = String.format("Expect to switch to thread %s*, but currently it is thread %s", - forkJoinPoolNamePrefix, currentThreadName); - assertTrue(currentThreadName.startsWith(forkJoinPoolNamePrefix), errorMessage); + poolNamePrefix, currentThreadName); + assertTrue(currentThreadName.startsWith(poolNamePrefix), errorMessage); }; // put with node which has parent(but the parent node is not exists).