Skip to content

Commit

Permalink
fix:passing changed flags in configuration change event
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarsh committed Aug 24, 2024
1 parent a878923 commit 39bcab6
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.FeatureProvider;
Expand All @@ -14,9 +15,12 @@
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
* OpenFeature provider for flagd.
Expand All @@ -33,6 +37,7 @@ public class FlagdProvider extends EventProvider implements FeatureProvider {

private EvaluationContext evaluationContext;

private Map<String, FeatureFlag> changedFlags = new HashMap<>();
protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
}
Expand All @@ -52,7 +57,7 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::setState);
this.flagResolver = new InProcessResolver(options, this::setState, this::updateFlags);
break;
case Config.RESOLVER_RPC:
this.flagResolver =
Expand Down Expand Up @@ -155,14 +160,18 @@ private void setState(ProviderState newState) {
this.handleStateTransition(oldState, newState);
}

private void updateFlags(Map<String,FeatureFlag> changedFlags){
this.changedFlags = changedFlags;
}

private void handleStateTransition(ProviderState oldState, ProviderState newState) {
// we got initialized
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
// nothing to do, the SDK emits the events
log.debug("Init completed");
return;
}
// we got shutdown, not checking oldState as behavior remains the same for shutdown
// we got shutdown, not checking oldState as behavior remains the same for shutdown
if (ProviderState.NOT_READY.equals(newState)) {
// nothing to do
log.debug("shutdown completed");
Expand All @@ -172,6 +181,9 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
if(!changedFlags.isEmpty()){
details.setFlagsChanged(changedFlags.keySet().stream().collect(Collectors.toList()));
}
this.emitProviderConfigurationChanged(details);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand All @@ -37,6 +38,7 @@
public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<ProviderState> stateConsumer;
private final Consumer<Map<String,FeatureFlag>> changedFlagsConsumer;
private final Operator operator;
private final long deadline;
private final ImmutableMetadata metadata;
Expand All @@ -45,10 +47,11 @@ public class InProcessResolver implements Resolver {
/**
* Initialize an in-process resolver.
*/
public InProcessResolver(FlagdOptions options, Consumer<ProviderState> stateConsumer) {
public InProcessResolver(FlagdOptions options, Consumer<ProviderState> stateConsumer, Consumer<Map<String, FeatureFlag>> changedFlagsConsumer) {
this.flagStore = new FlagStore(getConnector(options));
this.deadline = options.getDeadline();
this.stateConsumer = stateConsumer;
this.changedFlagsConsumer = changedFlagsConsumer;
this.operator = new Operator();
this.metadata = options.getSelector() == null ? null :
ImmutableMetadata.builder()
Expand All @@ -67,6 +70,7 @@ public void init() throws Exception {
final StorageState storageState = flagStore.getStateQueue().take();
switch (storageState) {
case OK:
changedFlagsConsumer.accept(flagStore.getChangedFlags());
stateConsumer.accept(ProviderState.READY);
this.connected.set(true);
break;
Expand Down Expand Up @@ -107,31 +111,31 @@ public void shutdown() throws InterruptedException {
* Resolve a boolean flag.
*/
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Boolean.class, key, ctx);
}

/**
* Resolve a string flag.
*/
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(String.class, key, ctx);
}

/**
* Resolve a double flag.
*/
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Double.class, key, ctx);
}

/**
* Resolve an integer flag.
*/
public ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Integer.class, key, ctx);
}

Expand Down Expand Up @@ -161,7 +165,7 @@ static Connector getConnector(final FlagdOptions options) {
}

private <T> ProviderEvaluation<T> resolve(Class<T> type, String key,
EvaluationContext ctx) {
EvaluationContext ctx) {
final FeatureFlag flag = flagStore.getFlag(key);

// missing flag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
Expand All @@ -31,6 +32,9 @@ public class FlagStore implements Storage {
private final BlockingQueue<StorageState> stateBlockingQueue = new LinkedBlockingQueue<>(1);
private final Map<String, FeatureFlag> flags = new HashMap<>();

@Getter
private final Map<String, FeatureFlag> changedFlags = new HashMap<>();

private final Connector connector;
private final boolean throwIfInvalid;

Expand Down Expand Up @@ -103,6 +107,7 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), throwIfInvalid);
writeLock.lock();
try {
updateChangedFlags(flagMap);
flags.clear();
flags.putAll(flagMap);
} finally {
Expand Down Expand Up @@ -132,4 +137,25 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
log.info("Shutting down store stream listener");
}

private void updateChangedFlags(Map<String, FeatureFlag> newFlags) {
Map<String, FeatureFlag> addedFeatureFlags = new HashMap<>();
Map<String, FeatureFlag> removedFeatureFlags = new HashMap<>();
Map<String, FeatureFlag> updatedFeatureFlags = new HashMap<>();
newFlags.forEach((key, value) -> {
if (!flags.containsKey(key)) {
addedFeatureFlags.put(key, value);
} else if (flags.containsKey(key) && !value.equals(flags.get(key))) {
updatedFeatureFlags.put(key, value);
}
});
flags.forEach((key,value) -> {
if(!newFlags.containsKey(key)) {
removedFeatureFlags.put(key, value);
}
});
changedFlags.putAll(addedFeatureFlags);
changedFlags.putAll(removedFeatureFlags);
changedFlags.putAll(updatedFeatureFlags);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;

import java.util.Map;
import java.util.concurrent.BlockingQueue;

/**
Expand All @@ -14,5 +15,7 @@ public interface Storage {

FeatureFlag getFlag(final String key);

Map<String, FeatureFlag> getChangedFlags();

BlockingQueue<StorageState> getStateQueue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class InProcessResolverTest {
public void connectorSetup(){
// given
FlagdOptions forGrpcOptions =
FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build();
FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build();
FlagdOptions forOfflineOptions =
FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).offlineFlagSourcePath("path").build();
FlagdOptions forCustomConnectorOptions =
Expand All @@ -69,11 +69,10 @@ public void eventHandling() throws Throwable {
// note - queues with adequate capacity
final BlockingQueue<StorageState> sender = new LinkedBlockingQueue<>(5);
final BlockingQueue<ProviderState> receiver = new LinkedBlockingQueue<>(5);
final BlockingQueue<Map<String,FeatureFlag>> updateFlags = new LinkedBlockingQueue<>(5);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender),
providerState -> {
receiver.offer(providerState);
});
providerState -> receiver.offer(providerState), changedFlags -> updateFlags.offer(changedFlags));

// when - init and emit events
Thread initThread = new Thread(() -> {
Expand Down Expand Up @@ -107,7 +106,7 @@ public void simpleBooleanResolving() throws Exception {
flagMap.put("booleanFlag", BOOLEAN_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<Boolean> providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", false,
Expand All @@ -126,7 +125,7 @@ public void simpleDoubleResolving() throws Exception {
flagMap.put("doubleFlag", DOUBLE_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<Double> providerEvaluation = inProcessResolver.doubleEvaluation("doubleFlag", 0d,
Expand All @@ -145,7 +144,7 @@ public void fetchIntegerAsDouble() throws Exception {
flagMap.put("doubleFlag", DOUBLE_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<Integer> providerEvaluation = inProcessResolver.integerEvaluation("doubleFlag", 0,
Expand All @@ -164,7 +163,7 @@ public void fetchDoubleAsInt() throws Exception {
flagMap.put("integerFlag", INT_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<Double> providerEvaluation = inProcessResolver.doubleEvaluation("integerFlag", 0d,
Expand All @@ -183,7 +182,7 @@ public void simpleIntResolving() throws Exception {
flagMap.put("integerFlag", INT_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<Integer> providerEvaluation = inProcessResolver.integerEvaluation("integerFlag", 0,
Expand All @@ -202,7 +201,7 @@ public void simpleObjectResolving() throws Exception {
flagMap.put("objectFlag", OBJECT_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

Map<String, Object> typeDefault = new HashMap<>();
typeDefault.put("key", "0164");
Expand All @@ -228,7 +227,7 @@ public void missingFlag() throws Exception {
final Map<String, FeatureFlag> flagMap = new HashMap<>();

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when/then
assertThrows(FlagNotFoundError.class, () -> {
Expand All @@ -244,7 +243,7 @@ public void disabledFlag() throws Exception {
flagMap.put("disabledFlag", DISABLED_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when/then
assertThrows(FlagNotFoundError.class, () -> {
Expand All @@ -259,7 +258,7 @@ public void variantMismatchFlag() throws Exception {
flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when/then
assertThrows(TypeMismatchError.class, () -> {
Expand All @@ -274,7 +273,7 @@ public void typeMismatchEvaluation() throws Exception {
flagMap.put("stringFlag", BOOLEAN_FLAG);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when/then
assertThrows(TypeMismatchError.class, () -> {
Expand All @@ -289,7 +288,7 @@ public void booleanShorthandEvaluation() throws Exception {
flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

ProviderEvaluation<Boolean> providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false,
new ImmutableContext());
Expand All @@ -307,7 +306,7 @@ public void targetingMatchedEvaluationFlag() throws Exception {
flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<String> providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg",
Expand All @@ -326,7 +325,7 @@ public void targetingUnmatchedEvaluationFlag() throws Exception {
flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<String> providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg",
Expand All @@ -345,7 +344,7 @@ public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalA
flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when
ProviderEvaluation<String> providerEvaluation =
Expand All @@ -364,7 +363,7 @@ public void targetingErrorEvaluationFlag() throws Exception {
flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET);

InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> {
});
},updatedFeatureFlags -> {});

// when/then
assertThrows(ParseError.class, () -> {
Expand Down Expand Up @@ -396,17 +395,18 @@ public void validateMetadataInEvaluationResult() throws Exception {
private InProcessResolver getInProcessResolverWth(final FlagdOptions options, final MockStorage storage)
throws NoSuchFieldException, IllegalAccessException {

final InProcessResolver resolver = new InProcessResolver(options, providerState -> {});
final InProcessResolver resolver = new InProcessResolver(options, providerState -> {},changedFlag -> {});
return injectFlagStore(resolver, storage);
}


private InProcessResolver getInProcessResolverWth(final MockStorage storage,
final Consumer<ProviderState> stateConsumer)
final Consumer<ProviderState> stateConsumer,
final Consumer<Map<String,FeatureFlag>> updatedFlagsConsumer)
throws NoSuchFieldException, IllegalAccessException {

final InProcessResolver resolver = new InProcessResolver(
FlagdOptions.builder().deadline(1000).build(), stateConsumer);
FlagdOptions.builder().deadline(1000).build(), stateConsumer, updatedFlagsConsumer);
return injectFlagStore(resolver, storage);
}

Expand Down
Loading

0 comments on commit 39bcab6

Please sign in to comment.