Skip to content

Commit

Permalink
Add latch for ZK hydration initialization (#665)
Browse files Browse the repository at this point in the history
* Revert "Revert "Add latch for ZK hydration initialization (#655)" (#664)"

This reverts commit 451c305.

* Change distributed query snapshot initialization

Reduces churn due to ZK updates, separate executor to prevent potential deadlock

---------

Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Sep 5, 2023
1 parent 463fa54 commit b671b7c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -92,9 +95,11 @@ public class KaldbDistributedQueryService extends KaldbQueryServiceBase implemen
// is used for controlling lucene future timeouts.
private final Duration requestTimeout;
private final Duration defaultQueryTimeout;

private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> pendingStubUpdate;
private final KaldbMetadataStoreChangeListener<SearchMetadata> searchMetadataListener =
(searchMetadata) -> updateStubs();
(searchMetadata) -> triggerStubUpdate();

// For now we will use SearchMetadataStore to populate servers
// But this is wasteful since we add snapshots more often than we add/remove nodes ( hopefully )
Expand All @@ -114,22 +119,35 @@ public KaldbDistributedQueryService(
this.requestTimeout = requestTimeout;
this.defaultQueryTimeout = defaultQueryTimeout;
searchMetadataTotalChangeCounter = meterRegistry.counter(SEARCH_METADATA_TOTAL_CHANGE_COUNTER);
this.searchMetadataStore.addListener(searchMetadataListener);
this.distributedQueryApdexSatisfied = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_SATISFIED);
this.distributedQueryApdexTolerating =
meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_TOLERATING);
this.distributedQueryApdexFrustrated =
meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_FRUSTRATED);

this.distributedQueryTotalSnapshots = meterRegistry.counter(DISTRIBUTED_QUERY_TOTAL_SNAPSHOTS);
this.distributedQuerySnapshotsWithReplicas =
meterRegistry.counter(DISTRIBUTED_QUERY_SNAPSHOTS_WITH_REPLICAS);

// first time call this function manually so that we initialize stubs
updateStubs();
// start listening for new events
this.searchMetadataStore.addListener(searchMetadataListener);

// trigger an update, if it hasn't already happened
triggerStubUpdate();
}

private void triggerStubUpdate() {
if (pendingStubUpdate == null || pendingStubUpdate.getDelay(TimeUnit.SECONDS) <= 0) {
// Add a small aggregation window to prevent churn of zk updates causing too many internal
// updates
pendingStubUpdate = executorService.schedule(this::doStubUpdate, 1500, TimeUnit.MILLISECONDS);
} else {
LOG.debug(
"Update stubs already queued for execution, will run in {} ms",
pendingStubUpdate.getDelay(TimeUnit.MILLISECONDS));
}
}

private void updateStubs() {
private void doStubUpdate() {
try {
searchMetadataTotalChangeCounter.increment();
Set<String> latestSearchServers = new HashSet<>();
Expand Down Expand Up @@ -163,7 +181,7 @@ private void updateStubs() {
}
});

LOG.info(
LOG.debug(
"SearchMetadata listener event. previous_total_stub_count={} current_total_stub_count={} added_stubs={} removed_stubs={}",
currentSearchMetadataCount,
stubs.size(),
Expand Down Expand Up @@ -321,9 +339,9 @@ private KaldbServiceGrpc.KaldbServiceFutureStub getStub(String url) {
return stubs.get(url);
} else {
LOG.warn(
"snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub",
"snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub. Will attempt to get uncached, which will be slow.",
url);
return null;
return getKaldbServiceGrpcClient(url);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.slack.kaldb.util.RuntimeHalterImpl;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +35,8 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {

private final ZPath zPath;

private final CountDownLatch cacheInitialized = new CountDownLatch(1);

protected final ModeledFramework<T> modeledClient;

private final CachedModeledFramework<T> cachedModeledFramework;
Expand Down Expand Up @@ -61,6 +65,7 @@ public KaldbMetadataStore(

if (shouldCache) {
cachedModeledFramework = modeledClient.cached();
cachedModeledFramework.listenable().addListener(getCacheInitializedListener());
cachedModeledFramework.start();
} else {
cachedModeledFramework = null;
Expand Down Expand Up @@ -99,6 +104,7 @@ public T getSync(String path) {

public CompletionStage<Stat> hasAsync(String path) {
if (cachedModeledFramework != null) {
awaitCacheInitialized();
return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists();
}
return modeledClient.withPath(zPath.resolved(path)).checkExists();
Expand Down Expand Up @@ -155,9 +161,11 @@ public void deleteSync(T metadataNode) {
}

public CompletionStage<List<T>> listAsync() {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}

awaitCacheInitialized();
return cachedModeledFramework.list();
}

Expand All @@ -170,8 +178,9 @@ public List<T> listSync() {
}

public void addListener(KaldbMetadataStoreChangeListener<T> watcher) {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}

// this mapping exists because the remove is by reference, and the listener is a different
// object type
Expand All @@ -187,11 +196,35 @@ public void addListener(KaldbMetadataStoreChangeListener<T> watcher) {
}

public void removeListener(KaldbMetadataStoreChangeListener<T> watcher) {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}
cachedModeledFramework.listenable().removeListener(listenerMap.remove(watcher));
}

private void awaitCacheInitialized() {
try {
cacheInitialized.await();
} catch (InterruptedException e) {
new RuntimeHalterImpl().handleFatal(e);
}
}

private ModeledCacheListener<T> getCacheInitializedListener() {
return new ModeledCacheListener<T>() {
@Override
public void accept(Type type, ZPath path, Stat stat, T model) {
// no-op
}

@Override
public void initialized() {
ModeledCacheListener.super.initialized();
cacheInitialized.countDown();
}
};
}

@Override
public synchronized void close() {
if (cachedModeledFramework != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void setUp() throws Exception {
KaldbConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(testingServer.getConnectString())
.setZkPathPrefix("Test")
.setZkSessionTimeoutMs(1000)
.setZkSessionTimeoutMs(10000)
.setZkConnectionTimeoutMs(1000)
.setSleepBetweenRetriesMs(500)
.build();
Expand Down Expand Up @@ -394,4 +394,56 @@ public TestMetadataStore() {
throw new RuntimeException(e);
}
}

@Test
public void testSlowCacheInitialization() {
class FastMetadataStore extends KaldbMetadataStore<TestMetadata> {
public FastMetadataStore() {
super(
curatorFramework,
CreateMode.PERSISTENT,
true,
new JacksonModelSerializer<>(TestMetadata.class),
STORE_FOLDER);
}
}

class SlowSerializer implements ModelSerializer<TestMetadata> {
final JacksonModelSerializer<TestMetadata> serializer =
new JacksonModelSerializer<>(TestMetadata.class);

@Override
public byte[] serialize(TestMetadata model) {
return serializer.serialize(model);
}

@Override
public TestMetadata deserialize(byte[] bytes) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return serializer.deserialize(bytes);
}
}

class SlowMetadataStore extends KaldbMetadataStore<TestMetadata> {
public SlowMetadataStore() {
super(curatorFramework, CreateMode.PERSISTENT, true, new SlowSerializer(), STORE_FOLDER);
}
}

int testMetadataInitCount = 10;
try (KaldbMetadataStore<TestMetadata> init = new FastMetadataStore()) {
for (int i = 0; i < testMetadataInitCount; i++) {
init.createSync(new TestMetadata("name" + i, "value" + i));
}
}

try (KaldbMetadataStore<TestMetadata> init = new SlowMetadataStore()) {
List<TestMetadata> metadata = init.listSync();
assertThat(metadata.size()).isEqualTo(testMetadataInitCount);
}
}
}

0 comments on commit b671b7c

Please sign in to comment.