Skip to content

Commit

Permalink
feat: Add GRPC stream connection deadline (#999)
Browse files Browse the repository at this point in the history
Signed-off-by: Guido Breitenhuber <[email protected]>
Signed-off-by: Todd Baert <[email protected]>
Co-authored-by: Todd Baert <[email protected]>
  • Loading branch information
guidobrei and toddbaert authored Oct 4, 2024
1 parent 27543c9 commit 9de03df
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class Config {
static final String DEFAULT_HOST = "localhost";

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final long DEFAULT_KEEP_ALIVE = 0;

Expand All @@ -31,6 +32,7 @@ public final class Config {
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String OFFLINE_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public class FlagdOptions {
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs = fallBackToEnvOrDefault(Config.STREAM_DEADLINE_MS_ENV_VAR_NAME,
Config.DEFAULT_STREAM_DEADLINE_MS);

/**
* Selector to be used with flag sync gRPC contract.
**/
Expand All @@ -101,7 +110,7 @@ public class FlagdOptions {
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
*
*
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -52,12 +54,18 @@ public void onNext(EventStreamResponse value) {
}

@Override
public void onError(Throwable t) {
log.warn("event stream", t);
if (this.cache.getEnabled()) {
this.cache.clear();
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException
&& ((StatusRuntimeException) throwable).getStatus().getCode()
.equals(Code.DEADLINE_EXCEEDED)) {
log.debug(String.format("stream deadline reached; will re-establish"));
} else {
log.error(String.format("event stream error", throwable));
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.onConnectionEvent.accept(false, Collections.emptyList());
}
this.onConnectionEvent.accept(false, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GrpcConnector {

private final int startEventStreamRetryBackoff;
private final long deadline;
private final long streamDeadlineMs;

private final Cache cache;
private final Consumer<ConnectionEvent> onConnectionEvent;
Expand Down Expand Up @@ -64,6 +65,7 @@ public GrpcConnector(final FlagdOptions options, final Cache cache, final Suppli
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
this.deadline = options.getDeadline();
this.streamDeadlineMs = options.getStreamDeadlineMs();
this.cache = cache;
this.onConnectionEvent = onConnectionEvent;
this.connectedSupplier = connectedSupplier;
Expand Down Expand Up @@ -126,7 +128,14 @@ private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::onConnectionEvent);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

ServiceGrpc.ServiceStub localServiceStub = this.serviceStub;

if (this.streamDeadlineMs > 0) {
localServiceStub = localServiceStub.withDeadlineAfter(this.streamDeadlineMs, TimeUnit.MILLISECONDS);
}

localServiceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
synchronized (sync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -43,6 +45,7 @@ public class GrpcStreamConnector implements Connector {
private final FlagSyncServiceStub serviceStub;
private final FlagSyncServiceBlockingStub serviceBlockingStub;
private final int deadline;
private final int streamDeadlineMs;
private final String selector;

/**
Expand All @@ -55,6 +58,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
serviceStub = FlagSyncServiceGrpc.newStub(channel);
serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel);
deadline = options.getDeadline();
streamDeadlineMs = options.getStreamDeadlineMs();
selector = options.getSelector();
}

Expand All @@ -64,7 +68,8 @@ public GrpcStreamConnector(final FlagdOptions options) {
public void init() {
Thread listener = new Thread(() -> {
try {
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline);
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline,
streamDeadlineMs);
} catch (InterruptedException e) {
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -114,7 +119,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
final FlagSyncServiceStub serviceStub,
final FlagSyncServiceBlockingStub serviceBlockingStub,
final String selector,
final int deadline)
final int deadline,
final int streamDeadlineMs)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
Expand All @@ -128,14 +134,20 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
log.debug("Initializing sync stream request");
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();

if (selector != null) {
syncRequest.setSelector(selector);
}

try (CancellableContext context = Context.current().withCancellation()) {
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
FlagSyncServiceStub localServiceStub = serviceStub;
if (streamDeadlineMs > 0) {
localServiceStub = localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS);
}

localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));

try {
metadataResponse = serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS)
.getMetadata(metadataRequest.build());
Expand All @@ -158,14 +170,21 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
}

if (response.getError() != null || metadataException != null) {
log.error(String.format("Error from initializing stream or metadata, retrying in %dms",
retryDelay), response.getError());

if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
if (response.getError() instanceof StatusRuntimeException
&& ((StatusRuntimeException) response.getError()).getStatus().getCode()
.equals(Code.DEADLINE_EXCEEDED)) {
log.debug(String.format("Stream deadline reached, re-establishing in %dms",
retryDelay));
} else {
log.error(String.format("Error initializing stream or metadata, retrying in %dms",
retryDelay), response.getError());
if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
}
}

// close the context to cancel the stream in case just the metadata call failed
context.cancel(metadataException);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -24,6 +25,8 @@

import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

class EventStreamObserverTest {

Expand Down Expand Up @@ -83,6 +86,15 @@ public void reconnections() {
assertFalse(states.get(0));
}

@Test
public void deadlineExceeded() {
stream.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED));
// we flush the cache
verify(cache, never()).clear();
// we notify the error
assertEquals(0, states.size());
}

@Test
public void cacheBustingForKnownKeys() {
final String key1 = "myKey1";
Expand Down
Loading

0 comments on commit 9de03df

Please sign in to comment.