Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add latch for ZK hydration initialization #665

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -92,9 +95,11 @@ public class KaldbDistributedQueryService extends KaldbQueryServiceBase implemen
// is used for controlling lucene future timeouts.
private final Duration requestTimeout;
private final Duration defaultQueryTimeout;

private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> pendingStubUpdate;
private final KaldbMetadataStoreChangeListener<SearchMetadata> searchMetadataListener =
(searchMetadata) -> updateStubs();
(searchMetadata) -> triggerStubUpdate();

// For now we will use SearchMetadataStore to populate servers
// But this is wasteful since we add snapshots more often than we add/remove nodes ( hopefully )
Expand All @@ -114,22 +119,35 @@ public KaldbDistributedQueryService(
this.requestTimeout = requestTimeout;
this.defaultQueryTimeout = defaultQueryTimeout;
searchMetadataTotalChangeCounter = meterRegistry.counter(SEARCH_METADATA_TOTAL_CHANGE_COUNTER);
this.searchMetadataStore.addListener(searchMetadataListener);
this.distributedQueryApdexSatisfied = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_SATISFIED);
this.distributedQueryApdexTolerating =
meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_TOLERATING);
this.distributedQueryApdexFrustrated =
meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_FRUSTRATED);

this.distributedQueryTotalSnapshots = meterRegistry.counter(DISTRIBUTED_QUERY_TOTAL_SNAPSHOTS);
this.distributedQuerySnapshotsWithReplicas =
meterRegistry.counter(DISTRIBUTED_QUERY_SNAPSHOTS_WITH_REPLICAS);

// first time call this function manually so that we initialize stubs
updateStubs();
// start listening for new events
this.searchMetadataStore.addListener(searchMetadataListener);

// trigger an update, if it hasn't already happened
triggerStubUpdate();
}

private void triggerStubUpdate() {
if (pendingStubUpdate == null || pendingStubUpdate.getDelay(TimeUnit.SECONDS) <= 0) {
// Add a small aggregation window to prevent churn of zk updates causing too many internal
// updates
pendingStubUpdate = executorService.schedule(this::doStubUpdate, 1500, TimeUnit.MILLISECONDS);
} else {
LOG.debug(
"Update stubs already queued for execution, will run in {} ms",
pendingStubUpdate.getDelay(TimeUnit.MILLISECONDS));
}
}

private void updateStubs() {
private void doStubUpdate() {
try {
searchMetadataTotalChangeCounter.increment();
Set<String> latestSearchServers = new HashSet<>();
Expand Down Expand Up @@ -163,7 +181,7 @@ private void updateStubs() {
}
});

LOG.info(
LOG.debug(
"SearchMetadata listener event. previous_total_stub_count={} current_total_stub_count={} added_stubs={} removed_stubs={}",
currentSearchMetadataCount,
stubs.size(),
Expand Down Expand Up @@ -321,9 +339,9 @@ private KaldbServiceGrpc.KaldbServiceFutureStub getStub(String url) {
return stubs.get(url);
} else {
LOG.warn(
"snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub",
"snapshot {} is not cached. ZK listener on searchMetadataStore should have cached the stub. Will attempt to get uncached, which will be slow.",
url);
return null;
return getKaldbServiceGrpcClient(url);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.slack.kaldb.util.RuntimeHalterImpl;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +35,8 @@ public class KaldbMetadataStore<T extends KaldbMetadata> implements Closeable {

private final ZPath zPath;

private final CountDownLatch cacheInitialized = new CountDownLatch(1);

protected final ModeledFramework<T> modeledClient;

private final CachedModeledFramework<T> cachedModeledFramework;
Expand Down Expand Up @@ -61,6 +65,7 @@ public KaldbMetadataStore(

if (shouldCache) {
cachedModeledFramework = modeledClient.cached();
cachedModeledFramework.listenable().addListener(getCacheInitializedListener());
cachedModeledFramework.start();
} else {
cachedModeledFramework = null;
Expand Down Expand Up @@ -99,6 +104,7 @@ public T getSync(String path) {

public CompletionStage<Stat> hasAsync(String path) {
if (cachedModeledFramework != null) {
awaitCacheInitialized();
return cachedModeledFramework.withPath(zPath.resolved(path)).checkExists();
}
return modeledClient.withPath(zPath.resolved(path)).checkExists();
Expand Down Expand Up @@ -155,9 +161,11 @@ public void deleteSync(T metadataNode) {
}

public CompletionStage<List<T>> listAsync() {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}

awaitCacheInitialized();
return cachedModeledFramework.list();
}

Expand All @@ -170,8 +178,9 @@ public List<T> listSync() {
}

public void addListener(KaldbMetadataStoreChangeListener<T> watcher) {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}

// this mapping exists because the remove is by reference, and the listener is a different
// object type
Expand All @@ -187,11 +196,35 @@ public void addListener(KaldbMetadataStoreChangeListener<T> watcher) {
}

public void removeListener(KaldbMetadataStoreChangeListener<T> watcher) {
if (cachedModeledFramework == null)
if (cachedModeledFramework == null) {
throw new UnsupportedOperationException("Caching is disabled");
}
cachedModeledFramework.listenable().removeListener(listenerMap.remove(watcher));
}

private void awaitCacheInitialized() {
try {
cacheInitialized.await();
} catch (InterruptedException e) {
new RuntimeHalterImpl().handleFatal(e);
}
}

private ModeledCacheListener<T> getCacheInitializedListener() {
return new ModeledCacheListener<T>() {
@Override
public void accept(Type type, ZPath path, Stat stat, T model) {
// no-op
}

@Override
public void initialized() {
ModeledCacheListener.super.initialized();
cacheInitialized.countDown();
}
};
}

@Override
public synchronized void close() {
if (cachedModeledFramework != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void setUp() throws Exception {
KaldbConfigs.ZookeeperConfig.newBuilder()
.setZkConnectString(testingServer.getConnectString())
.setZkPathPrefix("Test")
.setZkSessionTimeoutMs(1000)
.setZkSessionTimeoutMs(10000)
.setZkConnectionTimeoutMs(1000)
.setSleepBetweenRetriesMs(500)
.build();
Expand Down Expand Up @@ -394,4 +394,56 @@ public TestMetadataStore() {
throw new RuntimeException(e);
}
}

@Test
public void testSlowCacheInitialization() {
class FastMetadataStore extends KaldbMetadataStore<TestMetadata> {
public FastMetadataStore() {
super(
curatorFramework,
CreateMode.PERSISTENT,
true,
new JacksonModelSerializer<>(TestMetadata.class),
STORE_FOLDER);
}
}

class SlowSerializer implements ModelSerializer<TestMetadata> {
final JacksonModelSerializer<TestMetadata> serializer =
new JacksonModelSerializer<>(TestMetadata.class);

@Override
public byte[] serialize(TestMetadata model) {
return serializer.serialize(model);
}

@Override
public TestMetadata deserialize(byte[] bytes) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return serializer.deserialize(bytes);
}
}

class SlowMetadataStore extends KaldbMetadataStore<TestMetadata> {
public SlowMetadataStore() {
super(curatorFramework, CreateMode.PERSISTENT, true, new SlowSerializer(), STORE_FOLDER);
}
}

int testMetadataInitCount = 10;
try (KaldbMetadataStore<TestMetadata> init = new FastMetadataStore()) {
for (int i = 0; i < testMetadataInitCount; i++) {
init.createSync(new TestMetadata("name" + i, "value" + i));
}
}

try (KaldbMetadataStore<TestMetadata> init = new SlowMetadataStore()) {
List<TestMetadata> metadata = init.listSync();
assertThat(metadata.size()).isEqualTo(testMetadataInitCount);
}
}
}