Skip to content

Commit

Permalink
fix:passing changed flags in configuration change event
Browse files Browse the repository at this point in the history
Signed-off-by: utkarsh <[email protected]>
  • Loading branch information
utkarsh authored and toddbaert committed Sep 5, 2024
1 parent a878923 commit d86fbf4
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.FeatureProvider;
Expand All @@ -14,6 +15,7 @@
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -142,7 +144,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
return clientCallCtx;
}

private void setState(ProviderState newState) {
private void setState(ProviderState newState, List<String> changedFlagsKeys) {
ProviderState oldState;
Lock l = this.lock.writeLock();
try {
Expand All @@ -152,17 +154,17 @@ private void setState(ProviderState newState) {
} finally {
l.unlock();
}
this.handleStateTransition(oldState, newState);
this.handleStateTransition(oldState, newState, changedFlagsKeys);
}

private void handleStateTransition(ProviderState oldState, ProviderState newState) {
private void handleStateTransition(ProviderState oldState, ProviderState newState, List<String> changedFlagKeys) {
// we got initialized
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
// nothing to do, the SDK emits the events
log.debug("Init completed");
return;
}
// we got shutdown, not checking oldState as behavior remains the same for shutdown
// we got shutdown, not checking oldState as behavior remains the same for shutdown
if (ProviderState.NOT_READY.equals(newState)) {
// nothing to do
log.debug("shutdown completed");
Expand All @@ -172,6 +174,9 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
if (!changedFlagKeys.isEmpty()) {
details.setFlagsChanged(changedFlagKeys);
}
this.emitProviderConfigurationChanged(details);
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import java.util.function.BiConsumer;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
Expand Down Expand Up @@ -37,7 +37,7 @@ public class GrpcConnector {
private final long deadline;

private final Cache cache;
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;

private int eventStreamAttempt = 1;
private int eventStreamRetryBackoff;
Expand All @@ -52,7 +52,7 @@ public class GrpcConnector {
* @param cache cache to use.
* @param stateConsumer lambda to call for setting the state.
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
public GrpcConnector(final FlagdOptions options, final Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
this.channel = ChannelBuilder.nettyChannel(options);
this.serviceStub = ServiceGrpc.newStub(channel);
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
Expand Down Expand Up @@ -100,7 +100,7 @@ public void shutdown() throws Exception {
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
}
this.stateConsumer.accept(ProviderState.NOT_READY);
this.stateConsumer.accept(ProviderState.NOT_READY, null);
}
}

Expand Down Expand Up @@ -162,6 +162,6 @@ private void grpcStateConsumer(final ProviderState state) {
}

// chain to initiator
this.stateConsumer.accept(state);
this.stateConsumer.accept(state, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -62,7 +63,7 @@ public final class GrpcResolver implements Resolver {
* @param stateConsumer lambda to communicate back the state.
*/
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<ProviderState> stateSupplier,
final Consumer<ProviderState> stateConsumer) {
final BiConsumer<ProviderState,List<String>> stateConsumer) {
this.cache = cache;
this.stateSupplier = stateSupplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import dev.openfeature.sdk.exceptions.TypeMismatchError;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING;

Expand All @@ -36,7 +41,7 @@
@Slf4j
public class InProcessResolver implements Resolver {
private final Storage flagStore;
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;
private final Operator operator;
private final long deadline;
private final ImmutableMetadata metadata;
Expand All @@ -45,7 +50,7 @@ public class InProcessResolver implements Resolver {
/**
* Initialize an in-process resolver.
*/
public InProcessResolver(FlagdOptions options, Consumer<ProviderState> stateConsumer) {
public InProcessResolver(FlagdOptions options, BiConsumer<ProviderState, List<String>> stateConsumer) {
this.flagStore = new FlagStore(getConnector(options));
this.deadline = options.getDeadline();
this.stateConsumer = stateConsumer;
Expand All @@ -67,11 +72,12 @@ public void init() throws Exception {
final StorageState storageState = flagStore.getStateQueue().take();
switch (storageState) {
case OK:
stateConsumer.accept(ProviderState.READY);
stateConsumer.accept(ProviderState.READY,
flagStore.getChangedFlags().keySet().stream().collect(Collectors.toList()));
this.connected.set(true);
break;
case ERROR:
stateConsumer.accept(ProviderState.ERROR);
stateConsumer.accept(ProviderState.ERROR,null);
this.connected.set(false);
break;
case STALE:
Expand Down Expand Up @@ -100,38 +106,38 @@ public void init() throws Exception {
public void shutdown() throws InterruptedException {
flagStore.shutdown();
this.connected.set(false);
stateConsumer.accept(ProviderState.NOT_READY);
stateConsumer.accept(ProviderState.NOT_READY,null);
}

/**
* Resolve a boolean flag.
*/
public ProviderEvaluation<Boolean> booleanEvaluation(String key, Boolean defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Boolean.class, key, ctx);
}

/**
* Resolve a string flag.
*/
public ProviderEvaluation<String> stringEvaluation(String key, String defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(String.class, key, ctx);
}

/**
* Resolve a double flag.
*/
public ProviderEvaluation<Double> doubleEvaluation(String key, Double defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Double.class, key, ctx);
}

/**
* Resolve an integer flag.
*/
public ProviderEvaluation<Integer> integerEvaluation(String key, Integer defaultValue,
EvaluationContext ctx) {
EvaluationContext ctx) {
return resolve(Integer.class, key, ctx);
}

Expand Down Expand Up @@ -161,7 +167,7 @@ static Connector getConnector(final FlagdOptions options) {
}

private <T> ProviderEvaluation<T> resolve(Class<T> type, String key,
EvaluationContext ctx) {
EvaluationContext ctx) {
final FeatureFlag flag = flagStore.getFlag(key);

// missing flag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
Expand All @@ -31,6 +32,9 @@ public class FlagStore implements Storage {
private final BlockingQueue<StorageState> stateBlockingQueue = new LinkedBlockingQueue<>(1);
private final Map<String, FeatureFlag> flags = new HashMap<>();

@Getter
private final Map<String, FeatureFlag> changedFlags = new HashMap<>();

private final Connector connector;
private final boolean throwIfInvalid;

Expand Down Expand Up @@ -103,6 +107,7 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getData(), throwIfInvalid);
writeLock.lock();
try {
updateChangedFlags(flagMap);
flags.clear();
flags.putAll(flagMap);
} finally {
Expand Down Expand Up @@ -132,4 +137,26 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
log.info("Shutting down store stream listener");
}

private void updateChangedFlags(Map<String, FeatureFlag> newFlags) {
changedFlags.clear();
Map<String, FeatureFlag> addedFeatureFlags = new HashMap<>();
Map<String, FeatureFlag> removedFeatureFlags = new HashMap<>();
Map<String, FeatureFlag> updatedFeatureFlags = new HashMap<>();
newFlags.forEach((key, value) -> {
if (!flags.containsKey(key)) {
addedFeatureFlags.put(key, value);
} else if (flags.containsKey(key) && !value.equals(flags.get(key))) {
updatedFeatureFlags.put(key, value);
}
});
flags.forEach((key,value) -> {
if(!newFlags.containsKey(key)) {
removedFeatureFlags.put(key, value);
}
});
changedFlags.putAll(addedFeatureFlags);
changedFlags.putAll(removedFeatureFlags);
changedFlags.putAll(updatedFeatureFlags);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;

import java.util.Map;
import java.util.concurrent.BlockingQueue;

/**
Expand All @@ -14,5 +15,7 @@ public interface Storage {

FeatureFlag getFlag(final String key);

Map<String, FeatureFlag> getChangedFlags();

BlockingQueue<StorageState> getStateQueue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ void invalidate_cache() {
.thenReturn(serviceStubMock);

final Cache cache = new Cache("lru", 5);
grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, state -> {
grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> {
});
}

Expand Down Expand Up @@ -714,7 +714,7 @@ void disabled_cache() {
mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(serviceStubMock);

grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, state -> {
grpc = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> {
});
}

Expand Down Expand Up @@ -888,7 +888,7 @@ private FlagdProvider createProvider(GrpcConnector grpc, Supplier<ProviderState>
private FlagdProvider createProvider(GrpcConnector grpc, Cache cache, Supplier<ProviderState> getState) {
final FlagdOptions flagdOptions = FlagdOptions.builder().build();
final GrpcResolver grpcResolver =
new GrpcResolver(flagdOptions, cache, getState, (providerState) -> {
new GrpcResolver(flagdOptions, cache, getState, (providerState,changedFlagKeys) -> {
});

final FlagdProvider provider = new FlagdProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());

final GrpcConnector connector = new GrpcConnector(options, cache, (state) -> {
final GrpcConnector connector = new GrpcConnector(options, cache, (state,changedFlagKeys) -> {
});

Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub");
Expand Down Expand Up @@ -93,7 +93,7 @@ void initialization_succeed_with_connected_status() throws NoSuchFieldException,
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());

final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state) -> {
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> {
});

Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub");
Expand All @@ -117,7 +117,7 @@ void initialization_fail_with_timeout() throws Exception {
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());

final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state) -> {
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, (state,changedFlagKeys) -> {
});

Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub");
Expand Down
Loading

0 comments on commit d86fbf4

Please sign in to comment.