Skip to content

Commit

Permalink
fixup: fixing inprocess and eventing
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Jan 15, 2025
1 parent 82db146 commit 2b81d5b
Show file tree
Hide file tree
Showing 24 changed files with 247 additions and 321 deletions.
1 change: 1 addition & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
Expand All @@ -12,12 +13,17 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -31,10 +37,29 @@ public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private volatile ProviderEvent previousEvent = null;

/**
* An executor service responsible for scheduling reconnection attempts.
*/
private final ScheduledExecutorService reconnectExecutor;

/**
* A scheduled task for managing reconnection attempts.
*/
private ScheduledFuture<?> reconnectTask;

/**
* The grace period in milliseconds to wait for reconnection before emitting an error event.
*/
private final long gracePeriod;
/**
* The deadline in milliseconds for GRPC operations.
*/
private final long deadline;

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -55,18 +80,21 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
break;
default:
throw new IllegalStateException(
String.format("Requested unsupported resolver type of %s", options.getResolverType()));
}
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
contextEnricher = options.getContextEnricher();
this.reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
this.gracePeriod = options.getRetryGracePeriod();
this.deadline = options.getDeadline();
}

@Override
Expand All @@ -81,17 +109,22 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = this.connected = true;
// block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing
// into the equation
Util.busyWaitAndCheck(this.deadline + 1000, () -> initialized);
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
return;
}

try {
this.flagResolver.shutdown();
if (reconnectExecutor != null) {
reconnectExecutor.shutdownNow();
reconnectExecutor.awaitTermination(deadline, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
Expand Down Expand Up @@ -151,47 +184,73 @@ EvaluationContext getEnrichedContext() {
return enrichedContext;
}

private boolean isConnected() {
return this.connected;
}
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (previousEvent == ProviderEvent.PROVIDER_READY) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
break;
}
case PROVIDER_READY:
onReady();
previousEvent = ProviderEvent.PROVIDER_READY;
break;

if (!initialized) {
return;
case PROVIDER_ERROR:
if (previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
}
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
private void onReady() {
if (!initialized) {
initialized = true;
log.info("initialized FlagdProvider");
}
if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
ProviderEventDetails.builder().message("connected to flagd").build());
}

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
private void onError() {
log.info("Connection lost. Emit STALE event...");
log.debug("Waiting {}s for connection to become available...", gracePeriod);
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());

if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if (!reconnectExecutor.isShutdown()) {
reconnectTask = reconnectExecutor.schedule(
() -> {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
;
},
gracePeriod,
TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface Resolver {

void shutdown() throws Exception;

default void onError() {}

ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx);

ProviderEvaluation<String> stringEvaluation(String key, String defaultValue, EvaluationContext ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static void monitorChannelState(
Runnable onConnectionLost) {
channel.notifyWhenStateChanged(expectedState, () -> {
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
// log.info("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
if (onConnectionReady != null) {
onConnectionReady.run();
Expand Down

This file was deleted.

Loading

0 comments on commit 2b81d5b

Please sign in to comment.