From 39bcab6b7e6a933c81a0af091e0fb078954966c8 Mon Sep 17 00:00:00 2001 From: utkarsh Date: Sat, 24 Aug 2024 19:07:02 +0530 Subject: [PATCH] fix:passing changed flags in configuration change event --- .../providers/flagd/FlagdProvider.java | 16 ++++++- .../resolver/process/InProcessResolver.java | 16 ++++--- .../resolver/process/storage/FlagStore.java | 26 +++++++++++ .../resolver/process/storage/Storage.java | 3 ++ .../process/InProcessResolverTest.java | 44 +++++++++---------- .../flagd/resolver/process/MockStorage.java | 5 +++ 6 files changed, 80 insertions(+), 30 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 24f614656..f9effedd9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -4,6 +4,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; +import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.FeatureProvider; @@ -14,9 +15,12 @@ import dev.openfeature.sdk.Value; import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * OpenFeature provider for flagd. @@ -33,6 +37,7 @@ public class FlagdProvider extends EventProvider implements FeatureProvider { private EvaluationContext evaluationContext; + private Map changedFlags = new HashMap<>(); protected final void finalize() { // DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW } @@ -52,7 +57,7 @@ public FlagdProvider() { public FlagdProvider(final FlagdOptions options) { switch (options.getResolverType().asString()) { case Config.RESOLVER_IN_PROCESS: - this.flagResolver = new InProcessResolver(options, this::setState); + this.flagResolver = new InProcessResolver(options, this::setState, this::updateFlags); break; case Config.RESOLVER_RPC: this.flagResolver = @@ -155,6 +160,10 @@ private void setState(ProviderState newState) { this.handleStateTransition(oldState, newState); } + private void updateFlags(Map changedFlags){ + this.changedFlags = changedFlags; + } + private void handleStateTransition(ProviderState oldState, ProviderState newState) { // we got initialized if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) { @@ -162,7 +171,7 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat log.debug("Init completed"); return; } - // we got shutdown, not checking oldState as behavior remains the same for shutdown + // we got shutdown, not checking oldState as behavior remains the same for shutdown if (ProviderState.NOT_READY.equals(newState)) { // nothing to do log.debug("shutdown completed"); @@ -172,6 +181,9 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) { log.debug("Configuration changed"); ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build(); + if(!changedFlags.isEmpty()){ + details.setFlagsChanged(changedFlags.keySet().stream().collect(Collectors.toList())); + } this.emitProviderConfigurationChanged(details); return; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e3a525e20..736130419 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -23,6 +23,7 @@ import dev.openfeature.sdk.exceptions.TypeMismatchError; import lombok.extern.slf4j.Slf4j; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -37,6 +38,7 @@ public class InProcessResolver implements Resolver { private final Storage flagStore; private final Consumer stateConsumer; + private final Consumer> changedFlagsConsumer; private final Operator operator; private final long deadline; private final ImmutableMetadata metadata; @@ -45,10 +47,11 @@ public class InProcessResolver implements Resolver { /** * Initialize an in-process resolver. */ - public InProcessResolver(FlagdOptions options, Consumer stateConsumer) { + public InProcessResolver(FlagdOptions options, Consumer stateConsumer, Consumer> changedFlagsConsumer) { this.flagStore = new FlagStore(getConnector(options)); this.deadline = options.getDeadline(); this.stateConsumer = stateConsumer; + this.changedFlagsConsumer = changedFlagsConsumer; this.operator = new Operator(); this.metadata = options.getSelector() == null ? null : ImmutableMetadata.builder() @@ -67,6 +70,7 @@ public void init() throws Exception { final StorageState storageState = flagStore.getStateQueue().take(); switch (storageState) { case OK: + changedFlagsConsumer.accept(flagStore.getChangedFlags()); stateConsumer.accept(ProviderState.READY); this.connected.set(true); break; @@ -107,7 +111,7 @@ public void shutdown() throws InterruptedException { * Resolve a boolean flag. */ public ProviderEvaluation booleanEvaluation(String key, Boolean defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(Boolean.class, key, ctx); } @@ -115,7 +119,7 @@ public ProviderEvaluation booleanEvaluation(String key, Boolean default * Resolve a string flag. */ public ProviderEvaluation stringEvaluation(String key, String defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(String.class, key, ctx); } @@ -123,7 +127,7 @@ public ProviderEvaluation stringEvaluation(String key, String defaultVal * Resolve a double flag. */ public ProviderEvaluation doubleEvaluation(String key, Double defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(Double.class, key, ctx); } @@ -131,7 +135,7 @@ public ProviderEvaluation doubleEvaluation(String key, Double defaultVal * Resolve an integer flag. */ public ProviderEvaluation integerEvaluation(String key, Integer defaultValue, - EvaluationContext ctx) { + EvaluationContext ctx) { return resolve(Integer.class, key, ctx); } @@ -161,7 +165,7 @@ static Connector getConnector(final FlagdOptions options) { } private ProviderEvaluation resolve(Class type, String key, - EvaluationContext ctx) { + EvaluationContext ctx) { final FeatureFlag flag = flagStore.getFlag(key); // missing flag diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index ee642cca9..d1468ac1b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -5,6 +5,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; @@ -31,6 +32,9 @@ public class FlagStore implements Storage { private final BlockingQueue stateBlockingQueue = new LinkedBlockingQueue<>(1); private final Map flags = new HashMap<>(); + @Getter + private final Map changedFlags = new HashMap<>(); + private final Connector connector; private final boolean throwIfInvalid; @@ -103,6 +107,7 @@ private void streamerListener(final Connector connector) throws InterruptedExcep Map flagMap = FlagParser.parseString(take.getData(), throwIfInvalid); writeLock.lock(); try { + updateChangedFlags(flagMap); flags.clear(); flags.putAll(flagMap); } finally { @@ -132,4 +137,25 @@ private void streamerListener(final Connector connector) throws InterruptedExcep log.info("Shutting down store stream listener"); } + private void updateChangedFlags(Map newFlags) { + Map addedFeatureFlags = new HashMap<>(); + Map removedFeatureFlags = new HashMap<>(); + Map updatedFeatureFlags = new HashMap<>(); + newFlags.forEach((key, value) -> { + if (!flags.containsKey(key)) { + addedFeatureFlags.put(key, value); + } else if (flags.containsKey(key) && !value.equals(flags.get(key))) { + updatedFeatureFlags.put(key, value); + } + }); + flags.forEach((key,value) -> { + if(!newFlags.containsKey(key)) { + removedFeatureFlags.put(key, value); + } + }); + changedFlags.putAll(addedFeatureFlags); + changedFlags.putAll(removedFeatureFlags); + changedFlags.putAll(updatedFeatureFlags); + } + } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java index 32337094e..021312111 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java @@ -2,6 +2,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; +import java.util.Map; import java.util.concurrent.BlockingQueue; /** @@ -14,5 +15,7 @@ public interface Storage { FeatureFlag getFlag(final String key); + Map getChangedFlags(); + BlockingQueue getStateQueue(); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index d6a063c16..ca6e462d9 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -51,7 +51,7 @@ class InProcessResolverTest { public void connectorSetup(){ // given FlagdOptions forGrpcOptions = - FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build(); + FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).host("localhost").port(8080).build(); FlagdOptions forOfflineOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS).offlineFlagSourcePath("path").build(); FlagdOptions forCustomConnectorOptions = @@ -69,11 +69,10 @@ public void eventHandling() throws Throwable { // note - queues with adequate capacity final BlockingQueue sender = new LinkedBlockingQueue<>(5); final BlockingQueue receiver = new LinkedBlockingQueue<>(5); + final BlockingQueue> updateFlags = new LinkedBlockingQueue<>(5); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender), - providerState -> { - receiver.offer(providerState); - }); + providerState -> receiver.offer(providerState), changedFlags -> updateFlags.offer(changedFlags)); // when - init and emit events Thread initThread = new Thread(() -> { @@ -107,7 +106,7 @@ public void simpleBooleanResolving() throws Exception { flagMap.put("booleanFlag", BOOLEAN_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", false, @@ -126,7 +125,7 @@ public void simpleDoubleResolving() throws Exception { flagMap.put("doubleFlag", DOUBLE_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("doubleFlag", 0d, @@ -145,7 +144,7 @@ public void fetchIntegerAsDouble() throws Exception { flagMap.put("doubleFlag", DOUBLE_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("doubleFlag", 0, @@ -164,7 +163,7 @@ public void fetchDoubleAsInt() throws Exception { flagMap.put("integerFlag", INT_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("integerFlag", 0d, @@ -183,7 +182,7 @@ public void simpleIntResolving() throws Exception { flagMap.put("integerFlag", INT_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("integerFlag", 0, @@ -202,7 +201,7 @@ public void simpleObjectResolving() throws Exception { flagMap.put("objectFlag", OBJECT_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); Map typeDefault = new HashMap<>(); typeDefault.put("key", "0164"); @@ -228,7 +227,7 @@ public void missingFlag() throws Exception { final Map flagMap = new HashMap<>(); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when/then assertThrows(FlagNotFoundError.class, () -> { @@ -244,7 +243,7 @@ public void disabledFlag() throws Exception { flagMap.put("disabledFlag", DISABLED_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when/then assertThrows(FlagNotFoundError.class, () -> { @@ -259,7 +258,7 @@ public void variantMismatchFlag() throws Exception { flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when/then assertThrows(TypeMismatchError.class, () -> { @@ -274,7 +273,7 @@ public void typeMismatchEvaluation() throws Exception { flagMap.put("stringFlag", BOOLEAN_FLAG); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when/then assertThrows(TypeMismatchError.class, () -> { @@ -289,7 +288,7 @@ public void booleanShorthandEvaluation() throws Exception { flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, new ImmutableContext()); @@ -307,7 +306,7 @@ public void targetingMatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg", @@ -326,7 +325,7 @@ public void targetingUnmatchedEvaluationFlag() throws Exception { flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loopAlg", @@ -345,7 +344,7 @@ public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalA flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when ProviderEvaluation providerEvaluation = @@ -364,7 +363,7 @@ public void targetingErrorEvaluationFlag() throws Exception { flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), providerState -> { - }); + },updatedFeatureFlags -> {}); // when/then assertThrows(ParseError.class, () -> { @@ -396,17 +395,18 @@ public void validateMetadataInEvaluationResult() throws Exception { private InProcessResolver getInProcessResolverWth(final FlagdOptions options, final MockStorage storage) throws NoSuchFieldException, IllegalAccessException { - final InProcessResolver resolver = new InProcessResolver(options, providerState -> {}); + final InProcessResolver resolver = new InProcessResolver(options, providerState -> {},changedFlag -> {}); return injectFlagStore(resolver, storage); } private InProcessResolver getInProcessResolverWth(final MockStorage storage, - final Consumer stateConsumer) + final Consumer stateConsumer, + final Consumer> updatedFlagsConsumer) throws NoSuchFieldException, IllegalAccessException { final InProcessResolver resolver = new InProcessResolver( - FlagdOptions.builder().deadline(1000).build(), stateConsumer); + FlagdOptions.builder().deadline(1000).build(), stateConsumer, updatedFlagsConsumer); return injectFlagStore(resolver, storage); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java index 04c043bf4..2a92c642d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/MockStorage.java @@ -35,6 +35,11 @@ public FeatureFlag getFlag(String key) { return mockFlags.get(key); } + @Override + public Map getChangedFlags() { + return mockFlags; + } + @Nullable public BlockingQueue getStateQueue() { return mockQueue;