From daa5c85e00cf6a239857046390a6e475e905e409 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder <771133+bryanlb@users.noreply.github.com> Date: Fri, 1 Sep 2023 12:14:48 -0700 Subject: [PATCH] Revert "Add latch for ZK hydration initialization (#655)" This reverts commit 8d8024cfc8b710809ac9a56b0758b98c82019a55. --- .../metadata/core/KaldbMetadataStore.java | 39 ++------------ .../metadata/core/KaldbMetadataStoreTest.java | 54 +------------------ 2 files changed, 4 insertions(+), 89 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java index dfd48fe019..47351eb186 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java @@ -2,14 +2,12 @@ import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS; -import com.slack.kaldb.util.RuntimeHalterImpl; import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -35,8 +33,6 @@ public class KaldbMetadataStore implements Closeable { private final ZPath zPath; - private final CountDownLatch cacheInitialized = new CountDownLatch(1); - protected final ModeledFramework modeledClient; private final CachedModeledFramework cachedModeledFramework; @@ -65,7 +61,6 @@ public KaldbMetadataStore( if (shouldCache) { cachedModeledFramework = modeledClient.cached(); - cachedModeledFramework.listenable().addListener(getCacheInitializedListener()); cachedModeledFramework.start(); } else { cachedModeledFramework = null; @@ -104,7 +99,6 @@ public T getSync(String path) { public CompletionStage hasAsync(String path) { if (cachedModeledFramework != null) { - awaitCacheInitialized(); return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists(); } return modeledClient.withPath(zPath.resolved(path)).checkExists(); @@ -161,11 +155,9 @@ public void deleteSync(T metadataNode) { } public CompletionStage> listAsync() { - if (cachedModeledFramework == null) { + if (cachedModeledFramework == null) throw new UnsupportedOperationException("Caching is disabled"); - } - awaitCacheInitialized(); return cachedModeledFramework.list(); } @@ -178,9 +170,8 @@ public List listSync() { } public void addListener(KaldbMetadataStoreChangeListener watcher) { - if (cachedModeledFramework == null) { + if (cachedModeledFramework == null) throw new UnsupportedOperationException("Caching is disabled"); - } // this mapping exists because the remove is by reference, and the listener is a different // object type @@ -196,35 +187,11 @@ public void addListener(KaldbMetadataStoreChangeListener watcher) { } public void removeListener(KaldbMetadataStoreChangeListener watcher) { - if (cachedModeledFramework == null) { + if (cachedModeledFramework == null) throw new UnsupportedOperationException("Caching is disabled"); - } cachedModeledFramework.listenable().removeListener(listenerMap.remove(watcher)); } - private void awaitCacheInitialized() { - try { - cacheInitialized.await(); - } catch (InterruptedException e) { - new RuntimeHalterImpl().handleFatal(e); - } - } - - private ModeledCacheListener getCacheInitializedListener() { - return new ModeledCacheListener() { - @Override - public void accept(Type type, ZPath path, Stat stat, T model) { - // no-op - } - - @Override - public void initialized() { - ModeledCacheListener.super.initialized(); - cacheInitialized.countDown(); - } - }; - } - @Override public synchronized void close() { if (cachedModeledFramework != null) { diff --git a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java index a139581c94..4dbc90cd4c 100644 --- a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java @@ -42,7 +42,7 @@ public void setUp() throws Exception { KaldbConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("Test") - .setZkSessionTimeoutMs(10000) + .setZkSessionTimeoutMs(1000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) .build(); @@ -394,56 +394,4 @@ public TestMetadataStore() { throw new RuntimeException(e); } } - - @Test - public void testSlowCacheInitialization() { - class FastMetadataStore extends KaldbMetadataStore { - public FastMetadataStore() { - super( - curatorFramework, - CreateMode.PERSISTENT, - true, - new JacksonModelSerializer<>(TestMetadata.class), - STORE_FOLDER); - } - } - - class SlowSerializer implements ModelSerializer { - final JacksonModelSerializer serializer = - new JacksonModelSerializer<>(TestMetadata.class); - - @Override - public byte[] serialize(TestMetadata model) { - return serializer.serialize(model); - } - - @Override - public TestMetadata deserialize(byte[] bytes) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return serializer.deserialize(bytes); - } - } - - class SlowMetadataStore extends KaldbMetadataStore { - public SlowMetadataStore() { - super(curatorFramework, CreateMode.PERSISTENT, true, new SlowSerializer(), STORE_FOLDER); - } - } - - int testMetadataInitCount = 10; - try (KaldbMetadataStore init = new FastMetadataStore()) { - for (int i = 0; i < testMetadataInitCount; i++) { - init.createSync(new TestMetadata("name" + i, "value" + i)); - } - } - - try (KaldbMetadataStore init = new SlowMetadataStore()) { - List metadata = init.listSync(); - assertThat(metadata.size()).isEqualTo(testMetadataInitCount); - } - } }