Skip to content

Commit

Permalink
Use a cached executor for curator to prevent race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Sep 1, 2023
1 parent 0525d8a commit 8512305
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;

/** Builder class to instantiate an common curator instance for use in the metadata stores. */
public class CuratorBuilder {
private static final Logger LOG = LoggerFactory.getLogger(CuratorBuilder.class);
Expand Down Expand Up @@ -44,6 +46,7 @@ public static AsyncCuratorFramework build(
.connectionTimeoutMs(zkConfig.getZkConnectionTimeoutMs())
.sessionTimeoutMs(zkConfig.getZkSessionTimeoutMs())
.retryPolicy(retryPolicy)
.runSafeService(Executors.newCachedThreadPool())
.build();

// A catch-all handler for any errors we may have missed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -66,10 +67,9 @@ public KaldbMetadataStore(
modeledClient = ModeledFramework.wrap(curator, modelSpec);

if (shouldCache) {
cachedModeledFramework = modeledClient.cached(Executors.newFixedThreadPool(2));
cachedModeledFramework.listenable().addListener(getCacheInitializedListener(), Executors.newFixedThreadPool(2));
cachedModeledFramework = modeledClient.cached();
cachedModeledFramework.listenable().addListener(getCacheInitializedListener());
cachedModeledFramework.start();
//awaitCacheInitialized();
} else {
cachedModeledFramework = null;
}
Expand Down

0 comments on commit 8512305

Please sign in to comment.