Skip to content

Commit

Permalink
vthread
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Apr 20, 2024
1 parent d832553 commit 4a42ede
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ flexible messaging model and an intuitive client API.</description>
</issueManagement>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<pulsar.broker.compiler.release>${maven.compiler.target}</pulsar.broker.compiler.release>
<pulsar.client.compiler.release>8</pulsar.client.compiler.release>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -76,6 +77,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
private final MetadataStoreStats metadataStoreStats;
private final Thread.Builder vThreadBuilder;

// We don't strictly need to use 'volatile' here because we don't need the precise consistent semantic. Instead,
// we want to avoid the overhead of 'volatile'.
Expand Down Expand Up @@ -138,6 +140,12 @@ public CompletableFuture<Boolean> asyncReload(String key, Boolean oldValue,

this.metadataStoreName = metadataStoreName;
this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);

Properties systemProperties = System.getProperties();
systemProperties.put("jdk.virtualThreadScheduler.parallelism", "1");
systemProperties.put("jdk.virtualThreadScheduler.maxPoolSize", "1");
systemProperties.put("jdk.virtualThreadScheduler.minRunnable", "1");
vThreadBuilder = Thread.ofVirtual().name("Metadata-VThread", 0);
}

@Override
Expand Down Expand Up @@ -541,16 +549,21 @@ public void invalidateCaches(String...paths) {
}
}



/**
* Run the task in the ForkJoinPool.commonPool thread and fail the future if exceptionally.
*/
@VisibleForTesting
public void execute(Runnable task, CompletableFuture<?> future) {
CompletableFuture.runAsync(task).exceptionally(e -> {
if (!future.isDone()) {
future.completeExceptionally(e);
vThreadBuilder.start(() -> {
try {
task.run();
} catch (Throwable t) {
if (!future.isDone()) {
future.completeExceptionally(t);
}
}
return null;
});
}

Expand All @@ -559,13 +572,16 @@ public void execute(Runnable task, CompletableFuture<?> future) {
*/
@VisibleForTesting
public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) {
CompletableFuture.runAsync(task).exceptionally(e -> {
futures.get().forEach(f -> {
if (!f.isDone()) {
f.completeExceptionally(e);
}
});
return null;
vThreadBuilder.start(() -> {
try {
task.run();
} catch (Throwable t) {
futures.get().forEach(f -> {
if (!f.isDone()) {
f.completeExceptionally(t);
}
});
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,12 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e
@Cleanup
ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config);

String forkJoinPoolNamePrefix = "ForkJoinPool.commonPool";
String poolNamePrefix = "Metadata-VThread";
final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread %s*, but currently it is thread %s",
forkJoinPoolNamePrefix, currentThreadName);
assertTrue(currentThreadName.startsWith(forkJoinPoolNamePrefix), errorMessage);
poolNamePrefix, currentThreadName);
assertTrue(currentThreadName.startsWith(poolNamePrefix), errorMessage);
};

// put with node which has parent(but the parent node is not exists).
Expand Down

0 comments on commit 4a42ede

Please sign in to comment.