diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java index 46271accfb..27ac96c860 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java @@ -32,7 +32,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -92,9 +95,11 @@ public class KaldbDistributedQueryService extends KaldbQueryServiceBase implemen // is used for controlling lucene future timeouts. private final Duration requestTimeout; private final Duration defaultQueryTimeout; - + private final ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + private ScheduledFuture pendingStubUpdate; private final KaldbMetadataStoreChangeListener searchMetadataListener = - (searchMetadata) -> updateStubs(); + (searchMetadata) -> triggerStubUpdate(); // For now we will use SearchMetadataStore to populate servers // But this is wasteful since we add snapshots more often than we add/remove nodes ( hopefully ) @@ -114,22 +119,35 @@ public KaldbDistributedQueryService( this.requestTimeout = requestTimeout; this.defaultQueryTimeout = defaultQueryTimeout; searchMetadataTotalChangeCounter = meterRegistry.counter(SEARCH_METADATA_TOTAL_CHANGE_COUNTER); - this.searchMetadataStore.addListener(searchMetadataListener); this.distributedQueryApdexSatisfied = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_SATISFIED); this.distributedQueryApdexTolerating = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_TOLERATING); this.distributedQueryApdexFrustrated = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_FRUSTRATED); - this.distributedQueryTotalSnapshots = meterRegistry.counter(DISTRIBUTED_QUERY_TOTAL_SNAPSHOTS); this.distributedQuerySnapshotsWithReplicas = meterRegistry.counter(DISTRIBUTED_QUERY_SNAPSHOTS_WITH_REPLICAS); - // first time call this function manually so that we initialize stubs - updateStubs(); + // start listening for new events + this.searchMetadataStore.addListener(searchMetadataListener); + + // trigger an update, if it hasn't already happened + triggerStubUpdate(); + } + + private void triggerStubUpdate() { + if (pendingStubUpdate == null || pendingStubUpdate.getDelay(TimeUnit.SECONDS) <= 0) { + // Add a small aggregation window to prevent churn of zk updates causing too many internal + // updates + pendingStubUpdate = executorService.schedule(this::doStubUpdate, 1500, TimeUnit.MILLISECONDS); + } else { + LOG.debug( + "Update stubs already queued for execution, will run in {} ms", + pendingStubUpdate.getDelay(TimeUnit.MILLISECONDS)); + } } - private void updateStubs() { + private void doStubUpdate() { try { searchMetadataTotalChangeCounter.increment(); Set latestSearchServers = new HashSet<>(); @@ -163,7 +181,7 @@ private void updateStubs() { } }); - LOG.info( + LOG.debug( "SearchMetadata listener event. previous_total_stub_count={} current_total_stub_count={} added_stubs={} removed_stubs={}", currentSearchMetadataCount, stubs.size(), @@ -321,9 +339,9 @@ private KaldbServiceGrpc.KaldbServiceFutureStub getStub(String url) { return stubs.get(url); } else { LOG.warn( - "snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub", + "snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub. Will attempt to get uncached, which will be slow.", url); - return null; + return getKaldbServiceGrpcClient(url); } } diff --git a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java index 47351eb186..dfd48fe019 100644 --- a/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java +++ b/kaldb/src/main/java/com/slack/kaldb/metadata/core/KaldbMetadataStore.java @@ -2,12 +2,14 @@ import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS; +import com.slack.kaldb.util.RuntimeHalterImpl; import java.io.Closeable; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,6 +35,8 @@ public class KaldbMetadataStore implements Closeable { private final ZPath zPath; + private final CountDownLatch cacheInitialized = new CountDownLatch(1); + protected final ModeledFramework modeledClient; private final CachedModeledFramework cachedModeledFramework; @@ -61,6 +65,7 @@ public KaldbMetadataStore( if (shouldCache) { cachedModeledFramework = modeledClient.cached(); + cachedModeledFramework.listenable().addListener(getCacheInitializedListener()); cachedModeledFramework.start(); } else { cachedModeledFramework = null; @@ -99,6 +104,7 @@ public T getSync(String path) { public CompletionStage hasAsync(String path) { if (cachedModeledFramework != null) { + awaitCacheInitialized(); return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists(); } return modeledClient.withPath(zPath.resolved(path)).checkExists(); @@ -155,9 +161,11 @@ public void deleteSync(T metadataNode) { } public CompletionStage> listAsync() { - if (cachedModeledFramework == null) + if (cachedModeledFramework == null) { throw new UnsupportedOperationException("Caching is disabled"); + } + awaitCacheInitialized(); return cachedModeledFramework.list(); } @@ -170,8 +178,9 @@ public List listSync() { } public void addListener(KaldbMetadataStoreChangeListener watcher) { - if (cachedModeledFramework == null) + if (cachedModeledFramework == null) { throw new UnsupportedOperationException("Caching is disabled"); + } // this mapping exists because the remove is by reference, and the listener is a different // object type @@ -187,11 +196,35 @@ public void addListener(KaldbMetadataStoreChangeListener watcher) { } public void removeListener(KaldbMetadataStoreChangeListener watcher) { - if (cachedModeledFramework == null) + if (cachedModeledFramework == null) { throw new UnsupportedOperationException("Caching is disabled"); + } cachedModeledFramework.listenable().removeListener(listenerMap.remove(watcher)); } + private void awaitCacheInitialized() { + try { + cacheInitialized.await(); + } catch (InterruptedException e) { + new RuntimeHalterImpl().handleFatal(e); + } + } + + private ModeledCacheListener getCacheInitializedListener() { + return new ModeledCacheListener() { + @Override + public void accept(Type type, ZPath path, Stat stat, T model) { + // no-op + } + + @Override + public void initialized() { + ModeledCacheListener.super.initialized(); + cacheInitialized.countDown(); + } + }; + } + @Override public synchronized void close() { if (cachedModeledFramework != null) { diff --git a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java index 4dbc90cd4c..a139581c94 100644 --- a/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/metadata/core/KaldbMetadataStoreTest.java @@ -42,7 +42,7 @@ public void setUp() throws Exception { KaldbConfigs.ZookeeperConfig.newBuilder() .setZkConnectString(testingServer.getConnectString()) .setZkPathPrefix("Test") - .setZkSessionTimeoutMs(1000) + .setZkSessionTimeoutMs(10000) .setZkConnectionTimeoutMs(1000) .setSleepBetweenRetriesMs(500) .build(); @@ -394,4 +394,56 @@ public TestMetadataStore() { throw new RuntimeException(e); } } + + @Test + public void testSlowCacheInitialization() { + class FastMetadataStore extends KaldbMetadataStore { + public FastMetadataStore() { + super( + curatorFramework, + CreateMode.PERSISTENT, + true, + new JacksonModelSerializer<>(TestMetadata.class), + STORE_FOLDER); + } + } + + class SlowSerializer implements ModelSerializer { + final JacksonModelSerializer serializer = + new JacksonModelSerializer<>(TestMetadata.class); + + @Override + public byte[] serialize(TestMetadata model) { + return serializer.serialize(model); + } + + @Override + public TestMetadata deserialize(byte[] bytes) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return serializer.deserialize(bytes); + } + } + + class SlowMetadataStore extends KaldbMetadataStore { + public SlowMetadataStore() { + super(curatorFramework, CreateMode.PERSISTENT, true, new SlowSerializer(), STORE_FOLDER); + } + } + + int testMetadataInitCount = 10; + try (KaldbMetadataStore init = new FastMetadataStore()) { + for (int i = 0; i < testMetadataInitCount; i++) { + init.createSync(new TestMetadata("name" + i, "value" + i)); + } + } + + try (KaldbMetadataStore init = new SlowMetadataStore()) { + List metadata = init.listSync(); + assertThat(metadata.size()).isEqualTo(testMetadataInitCount); + } + } }