Skip to content

Commit

Permalink
Refactor read only chunk timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Feb 12, 2024
1 parent a5a3209 commit 961df72
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
29 changes: 7 additions & 22 deletions kaldb/src/main/java/com/slack/kaldb/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.slack.kaldb.chunk;

import static com.slack.kaldb.server.KaldbConfig.DEFAULT_ZK_TIMEOUT_SECS;

import com.google.common.annotations.VisibleForTesting;
import com.slack.kaldb.blobfs.BlobFs;
import com.slack.kaldb.logstore.search.LogIndexSearcher;
Expand Down Expand Up @@ -48,10 +50,6 @@
public class ReadOnlyChunkImpl<T> implements Chunk<T> {

private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyChunkImpl.class);

@Deprecated // replace with sync methods, which use DEFAULT_ZK_TIMEOUT_SECS where possible
private static final int TIMEOUT_MS = 5000;

private ChunkInfo chunkInfo;
private LogIndexSearcher<T> logSearcher;
private SearchMetadata searchMetadata;
Expand Down Expand Up @@ -174,20 +172,14 @@ public static SearchMetadata registerSearchMetadata(
snapshotName, cacheSearchContext.hostname),
snapshotName,
cacheSearchContext.toUrl());
searchMetadataStore
.createAsync(metadata)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
searchMetadataStore.createSync(metadata);
return metadata;
}

private void unregisterSearchMetadata()
throws ExecutionException, InterruptedException, TimeoutException {
if (this.searchMetadata != null) {
searchMetadataStore
.deleteAsync(searchMetadata)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
searchMetadataStore.deleteSync(searchMetadata);
}
}

Expand Down Expand Up @@ -265,15 +257,8 @@ private void handleChunkAssignment(CacheSlotMetadata cacheSlotMetadata) {

private SnapshotMetadata getSnapshotMetadata(String replicaId)
throws ExecutionException, InterruptedException, TimeoutException {
ReplicaMetadata replicaMetadata =
replicaMetadataStore
.findAsync(replicaId)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
return snapshotMetadataStore
.findAsync(replicaMetadata.snapshotId)
.toCompletableFuture()
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ReplicaMetadata replicaMetadata = replicaMetadataStore.findSync(replicaId);
return snapshotMetadataStore.findSync(replicaMetadata.snapshotId);
}

// We lock access when manipulating the chunk, as the close()
Expand Down Expand Up @@ -319,7 +304,7 @@ private boolean setChunkMetadataState(
try {
cacheSlotMetadataStore
.updateNonFreeCacheSlotState(cacheSlotMetadata, newState)
.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
.get(DEFAULT_ZK_TIMEOUT_SECS, TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Error setting chunk metadata state", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ public void shouldConvertSchemaDefinitionToFromProto() {
KaldbSearch.SchemaDefinition.newBuilder()
.setType(KaldbSearch.FieldType.INTEGER)
.build());

assertThat(
SearchResultUtils.fromSchemaDefinitionProto(
KaldbSearch.SchemaDefinition.newBuilder()
.setType(KaldbSearch.FieldType.ID)
.build()))
.isEqualTo(FieldType.ID);
assertThat(SearchResultUtils.toSchemaDefinitionProto(FieldType.ID))
.isEqualTo(
KaldbSearch.SchemaDefinition.newBuilder().setType(KaldbSearch.FieldType.ID).build());
}

@Test
Expand Down

0 comments on commit 961df72

Please sign in to comment.