Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rpc caching not behaving as expected (cleared too often) #1115

Merged
merged 15 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[submodule "providers/flagd/test-harness"]
path = providers/flagd/test-harness
url = https://github.com/open-feature/test-harness.git
branch = v0.5.21
branch = v1.1.1
[submodule "providers/flagd/spec"]
path = providers/flagd/spec
url = https://github.com/open-feature/spec.git
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.cucumber</groupId>
<artifactId>cucumber-picocontainer</artifactId>
<version>7.20.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,20 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<!-- uncomment for logoutput during test runs -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand All @@ -12,12 +13,17 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -31,10 +37,32 @@ public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private volatile ProviderEvent previousEvent = null;
private final Object eventLock;

/**
* An executor service responsible for emitting {@link ProviderEvent#PROVIDER_ERROR} after the provider went
* {@link ProviderEvent#PROVIDER_STALE} for {@link #gracePeriod} seconds.
*/
private final ScheduledExecutorService errorExecutor;

/**
* A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
*/
private ScheduledFuture<?> errorTask;

/**
* The grace period in milliseconds to wait after {@link ProviderEvent#PROVIDER_STALE} before emitting a
* {@link ProviderEvent#PROVIDER_ERROR}.
*/
private final long gracePeriod;
/**
* The deadline in milliseconds for GRPC operations.
*/
private final long deadline;

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -55,18 +83,22 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
break;
default:
throw new IllegalStateException(
String.format("Requested unsupported resolver type of %s", options.getResolverType()));
}
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
contextEnricher = options.getContextEnricher();
this.errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.gracePeriod = options.getRetryGracePeriod();
this.deadline = options.getDeadline();
this.eventLock = new Object();
}

@Override
Expand All @@ -81,17 +113,23 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = this.connected = true;
// block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing
// into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number - follow up
Util.busyWaitAndCheck(this.deadline + 200, () -> initialized);
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
return;
}

try {
this.flagResolver.shutdown();
aepfli marked this conversation as resolved.
Show resolved Hide resolved
if (errorExecutor != null) {
errorExecutor.shutdownNow();
errorExecutor.awaitTermination(deadline, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
Expand Down Expand Up @@ -151,47 +189,92 @@ EvaluationContext getEnrichedContext() {
return enrichedContext;
}

private boolean isConnected() {
return this.connected;
}
@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
aepfli marked this conversation as resolved.
Show resolved Hide resolved

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
synchronized (eventLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent);
syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
/*
We only use Error and Ready as previous states.
As error will first be emitted as Stale, and only turns after a while into an emitted Error.
Ready is needed, as the InProcessResolver does not have a dedicated ready event, hence we need to
forward a configuration changed to the ready, if we are not in the ready state.
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (previousEvent == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
previousEvent = ProviderEvent.PROVIDER_READY;
break;

if (!initialized) {
return;
case PROVIDER_ERROR:
if (previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
}

private void onReady() {
if (!initialized) {
initialized = true;
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
ProviderEventDetails.builder().message("connected to flagd").build());
}

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
private void onError() {
log.info("Connection lost. Emit STALE event...");
log.debug("Waiting {}s for connection to become available...", gracePeriod);
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());

if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (previousEvent == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
},
gracePeriod,
TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface Resolver {

void shutdown() throws Exception;

default void onError() {}

ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx);

ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void monitorChannelState(
Runnable onConnectionLost) {
channel.notifyWhenStateChanged(expectedState, () -> {
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
log.debug("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
if (onConnectionReady != null) {
onConnectionReady.run();
Expand Down

This file was deleted.

Loading
Loading