Skip to content

Commit

Permalink
feat: emit changed flags in configuration change event (#925)
Browse files Browse the repository at this point in the history
Signed-off-by: utkarsh <utkarsh.sharma@engati.com>
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
Co-authored-by: utkarsh <utkarsh.sharma@engati.com>
Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
3 people authored Sep 16, 2024
1 parent f53b898 commit d3de874
Showing 18 changed files with 279 additions and 144 deletions.
5 changes: 5 additions & 0 deletions providers/flagd/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This file is needed to avoid errors throw by findbugs when working with lombok.
lombok.addSuppressWarnings = true
lombok.addLombokGeneratedAnnotation = true
config.stopBubbling = true
lombok.extern.findbugs.addSuppressFBWarnings = true
Original file line number Diff line number Diff line change
@@ -6,14 +6,14 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.FeatureProvider;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.ProviderState;
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -23,7 +23,7 @@
*/
@Slf4j
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider implements FeatureProvider {
public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagD Provider";

private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -142,7 +142,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
return clientCallCtx;
}

private void setState(ProviderState newState) {
private void setState(ProviderState newState, List<String> changedFlagsKeys) {
ProviderState oldState;
Lock l = this.lock.writeLock();
try {
@@ -152,17 +152,17 @@ private void setState(ProviderState newState) {
} finally {
l.unlock();
}
this.handleStateTransition(oldState, newState);
this.handleStateTransition(oldState, newState, changedFlagsKeys);
}

private void handleStateTransition(ProviderState oldState, ProviderState newState) {
private void handleStateTransition(ProviderState oldState, ProviderState newState, List<String> changedFlagKeys) {
// we got initialized
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
// nothing to do, the SDK emits the events
log.debug("Init completed");
return;
}
// we got shutdown, not checking oldState as behavior remains the same for shutdown
// we got shutdown, not checking oldState as behavior remains the same for shutdown
if (ProviderState.NOT_READY.equals(newState)) {
// nothing to do
log.debug("shutdown completed");
@@ -171,7 +171,8 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
// configuration changed
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys)
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
return;
}
Original file line number Diff line number Diff line change
@@ -8,16 +8,19 @@
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* EventStreamObserver handles events emitted by flagd.
*/
@Slf4j
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;
private final Object sync;
private final Cache cache;

@@ -28,11 +31,11 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
/**
* Create a gRPC stream that get notified about flag changes.
*
* @param sync synchronization object from caller
* @param cache cache to update
* @param stateConsumer lambda to call for setting the state
* @param sync synchronization object from caller
* @param cache cache to update
* @param stateConsumer lambda to call for setting the state
*/
EventStreamObserver(Object sync, Cache cache, Consumer<ProviderState> stateConsumer) {
EventStreamObserver(Object sync, Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
this.sync = sync;
this.cache = cache;
this.stateConsumer = stateConsumer;
@@ -58,7 +61,7 @@ public void onError(Throwable t) {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(ProviderState.ERROR);
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
@@ -69,32 +72,38 @@ public void onCompleted() {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(ProviderState.ERROR);
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
}

private void handleConfigurationChangeEvent(EventStreamResponse value) {
this.stateConsumer.accept(ProviderState.READY);
if (!this.cache.getEnabled()) {
return;
}
List<String> changedFlags = new ArrayList<>();
boolean cachingEnabled = this.cache.getEnabled();

Map<String, Value> data = value.getData().getFieldsMap();
Value flagsValue = data.get(FLAGS_KEY);
if (flagsValue == null) {
this.cache.clear();
return;
if (cachingEnabled) {
this.cache.clear();
}
} else {
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
this.cache.getEnabled();
for (String flagKey : flags.keySet()) {
changedFlags.add(flagKey);
if (cachingEnabled) {
this.cache.remove(flagKey);
}
}
}

Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
for (String flagKey : flags.keySet()) {
this.cache.remove(flagKey);
}
this.stateConsumer.accept(ProviderState.READY, changedFlags);
}

private void handleProviderReadyEvent() {
this.stateConsumer.accept(ProviderState.READY);
this.stateConsumer.accept(ProviderState.READY, Collections.emptyList());
if (this.cache.getEnabled()) {
this.cache.clear();
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import java.util.function.BiConsumer;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
@@ -37,7 +38,7 @@ public class GrpcConnector {
private final long deadline;

private final Cache cache;
private final Consumer<ProviderState> stateConsumer;
private final BiConsumer<ProviderState, List<String>> stateConsumer;

private int eventStreamAttempt = 1;
private int eventStreamRetryBackoff;
@@ -52,7 +53,8 @@ public class GrpcConnector {
* @param cache cache to use.
* @param stateConsumer lambda to call for setting the state.
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
public GrpcConnector(final FlagdOptions options, final Cache cache,
BiConsumer<ProviderState, List<String>> stateConsumer) {
this.channel = ChannelBuilder.nettyChannel(options);
this.serviceStub = ServiceGrpc.newStub(channel);
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
@@ -80,7 +82,8 @@ public void initialize() throws Exception {
/**
* Shuts down all gRPC resources.
*
* @throws Exception is something goes wrong while terminating the communication.
* @throws Exception is something goes wrong while terminating the
* communication.
*/
public void shutdown() throws Exception {
// first shutdown the event listener
@@ -100,7 +103,7 @@ public void shutdown() throws Exception {
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
}
this.stateConsumer.accept(ProviderState.NOT_READY);
this.stateConsumer.accept(ProviderState.NOT_READY, Collections.emptyList());
}
}

@@ -114,21 +117,24 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
}

/**
* Event stream observer logic. This contains blocking mechanisms, hence must be run in a dedicated thread.
* Event stream observer logic. This contains blocking mechanisms, hence must be
* run in a dedicated thread.
*/
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver =
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::grpcStateConsumer);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
synchronized (sync) {
sync.wait();
}
} catch (InterruptedException e) {
// Interruptions are considered end calls for this observer, hence log and return
// Note - this is the most common interruption when shutdown, hence the log level debug
// Interruptions are considered end calls for this observer, hence log and
// return
// Note - this is the most common interruption when shutdown, hence the log
// level debug
log.debug("interruption while waiting for condition", e);
Thread.currentThread().interrupt();
}
@@ -140,17 +146,18 @@ private void observeEventStream() {
try {
Thread.sleep(this.eventStreamRetryBackoff);
} catch (InterruptedException e) {
// Interruptions are considered end calls for this observer, hence log and return
// Interruptions are considered end calls for this observer, hence log and
// return
log.warn("interrupted while restarting gRPC Event Stream");
Thread.currentThread().interrupt();
}
}

log.error("failed to connect to event stream, exhausted retries");
this.grpcStateConsumer(ProviderState.ERROR);
this.grpcStateConsumer(ProviderState.ERROR, null);
}

private void grpcStateConsumer(final ProviderState state) {
private void grpcStateConsumer(final ProviderState state, final List<String> changedFlags) {
// check for readiness
if (ProviderState.READY.equals(state)) {
this.eventStreamAttempt = 1;
@@ -162,6 +169,6 @@ private void grpcStateConsumer(final ProviderState state) {
}

// chain to initiator
this.stateConsumer.accept(state);
this.stateConsumer.accept(state, changedFlags);
}
}
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -62,7 +62,7 @@ public final class GrpcResolver implements Resolver {
* @param stateConsumer lambda to communicate back the state.
*/
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<ProviderState> stateSupplier,
final Consumer<ProviderState> stateConsumer) {
final BiConsumer<ProviderState,List<String>> stateConsumer) {
this.cache = cache;
this.stateSupplier = stateSupplier;

Loading

0 comments on commit d3de874

Please sign in to comment.