Skip to content

Commit

Permalink
feat: call and expose sync-metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
  • Loading branch information
toddbaert committed Sep 24, 2024
1 parent 7d66ca8 commit 59b6474
Show file tree
Hide file tree
Showing 20 changed files with 730 additions and 518 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
Expand All @@ -24,6 +26,7 @@ public class FlagdProvider extends EventProvider {
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Map<String, Object> syncMetadata = Collections.emptyMap();

private EvaluationContext evaluationContext;

Expand All @@ -47,13 +50,13 @@ public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::isConnected,
this::onResolverConnectionChanged);
this::onConnectionEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onResolverConnectionChanged);
this::onConnectionEvent);
break;
default:
throw new IllegalStateException(
Expand Down Expand Up @@ -117,6 +120,19 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
return this.flagResolver.objectEvaluation(key, defaultValue, mergeContext(ctx));
}

/**
* An unmodifiable view of an object map representing the latest result of the
* SyncMetadata.
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
* @return Object map representing sync metadata
*/
protected Map<String, Object> getSyncMetadata() {
return Collections.unmodifiableMap(syncMetadata);
}

private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
if (this.evaluationContext != null) {
return evaluationContext.merge(clientCallCtx);
Expand All @@ -129,10 +145,12 @@ private boolean isConnected() {
return this.connected;
}

private void onResolverConnectionChanged(boolean newConnectedState, List<String> changedFlagKeys) {
private void onConnectionEvent(boolean newConnectedState, List<String> changedFlagKeys,
Map<String, Object> syncMetadata) {
boolean previous = connected;
boolean current = newConnectedState;
this.connected = newConnectedState;
this.syncMetadata = syncMetadata;

// configuration changed
if (initialized && previous && current) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import dev.openfeature.sdk.Value;

/**
* A generic flag resolving contract for flagd.
* Abstraction that resolves flag values in from some source.
*/
public interface Resolver {
void init() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
@Slf4j
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
private final BiConsumer<Boolean, List<String>> stateConsumer;
private final BiConsumer<Boolean, List<String>> onConnectionEvent;
private final Object sync;
private final Cache cache;

private static final String CONFIGURATION_CHANGE = "configuration_change";
private static final String PROVIDER_READY = "provider_ready";
public static final String CONFIGURATION_CHANGE = "configuration_change";
public static final String PROVIDER_READY = "provider_ready";
static final String FLAGS_KEY = "flags";

/**
* Create a gRPC stream that get notified about flag changes.
*
* @param sync synchronization object from caller
* @param cache cache to update
* @param stateConsumer lambda to call for setting the state
* @param onResponse lambda to call to handle the response
*/
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> stateConsumer) {
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onResponse) {
this.sync = sync;
this.cache = cache;
this.stateConsumer = stateConsumer;
this.onConnectionEvent = onResponse;
}

@Override
Expand All @@ -61,7 +61,7 @@ public void onError(Throwable t) {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(false, Collections.emptyList());
this.onConnectionEvent.accept(false, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
Expand All @@ -72,7 +72,7 @@ public void onCompleted() {
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.stateConsumer.accept(false, Collections.emptyList());
this.onConnectionEvent.accept(false, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
Expand All @@ -99,11 +99,11 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) {
}
}

this.stateConsumer.accept(true, changedFlags);
this.onConnectionEvent.accept(true, changedFlags);
}

private void handleProviderReadyEvent() {
this.stateConsumer.accept(true, Collections.emptyList());
this.onConnectionEvent.accept(true, Collections.emptyList());
if (this.cache.getEnabled()) {
this.cache.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
Expand All @@ -14,6 +14,7 @@
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
import dev.openfeature.sdk.internal.TriConsumer;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
Expand All @@ -37,7 +38,7 @@ public class GrpcConnector {
private final long deadline;

private final Cache cache;
private final BiConsumer<Boolean, List<String>> stateConsumer;
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
private final Supplier<Boolean> connectedSupplier;

private int eventStreamAttempt = 1;
Expand All @@ -48,23 +49,23 @@ public class GrpcConnector {

/**
* GrpcConnector creates an abstraction over gRPC communication.
*
* @param options options to build the gRPC channel.
* @param cache cache to use.
* @param stateConsumer lambda to call for setting the state.
*
* @param options flagd options
* @param cache cache to use
* @param connectedSupplier lambda providing current connection status from caller
* @param onConnectionEvent lambda which handles changes in the connection/stream
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
BiConsumer<Boolean, List<String>> stateConsumer) {
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
this.channel = ChannelBuilder.nettyChannel(options);
this.serviceStub = ServiceGrpc.newStub(channel);
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);

this.maxEventStreamRetries = options.getMaxEventStreamRetries();
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
this.deadline = options.getDeadline();
this.cache = cache;
this.stateConsumer = stateConsumer;
this.onConnectionEvent = onConnectionEvent;
this.connectedSupplier = connectedSupplier;
}

Expand Down Expand Up @@ -104,7 +105,7 @@ public void shutdown() throws Exception {
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
}
this.stateConsumer.accept(false, Collections.emptyList());
this.onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
}
}

Expand All @@ -124,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::grpcStateConsumer);
this::grpconConnectionEvent);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
Expand Down Expand Up @@ -155,16 +156,16 @@ private void observeEventStream() {
}

log.error("failed to connect to event stream, exhausted retries");
this.grpcStateConsumer(false, null);
this.grpconConnectionEvent(false, Collections.emptyList());
}

private void grpcStateConsumer(final boolean connected, final List<String> changedFlags) {
private void grpconConnectionEvent(final boolean connected, final List<String> changedFlags) {
// reset reconnection states
if (connected) {
this.eventStreamAttempt = 1;
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
}
// chain to initiator
this.stateConsumer.accept(connected, changedFlags);
this.onConnectionEvent.accept(connected, changedFlags, Collections.emptyMap());
}
}
Loading

0 comments on commit 59b6474

Please sign in to comment.