From 470717d6f21f2a9914db9a486c8fa1b147fcb356 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 25 Sep 2024 10:16:44 -0400 Subject: [PATCH] feat: expose sync-metadata on provider Signed-off-by: Todd Baert --- .../providers/flagd/FlagdProvider.java | 24 +- .../providers/flagd/resolver/Resolver.java | 2 +- .../resolver/grpc/EventStreamObserver.java | 20 +- .../flagd/resolver/grpc/GrpcConnector.java | 29 +- .../flagd/resolver/grpc/GrpcResolver.java | 54 +- .../resolver/process/InProcessResolver.java | 36 +- .../resolver/process/storage/FlagStore.java | 8 +- .../process/storage/StorageStateChange.java | 35 +- .../process/storage/connector/Connector.java | 4 +- .../{StreamPayload.java => QueuePayload.java} | 8 +- ...PayloadType.java => QueuePayloadType.java} | 2 +- .../storage/connector/file/FileConnector.java | 23 +- .../connector/grpc/GrpcStreamConnector.java | 78 +- .../providers/flagd/FlagdProviderTest.java | 62 +- .../resolver/grpc/GrpcConnectorTest.java | 72 +- .../process/InProcessResolverTest.java | 677 +++++++++--------- .../process/storage/FlagStoreTest.java | 21 +- .../process/storage/MockConnector.java | 13 +- .../connector/file/FileConnectorTest.java | 28 +- .../grpc/GrpcStreamConnectorTest.java | 52 +- 20 files changed, 730 insertions(+), 518 deletions(-) rename providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/{StreamPayload.java => QueuePayload.java} (55%) rename providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/{StreamPayloadType.java => QueuePayloadType.java} (83%) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index a396d00f8..3532bf3eb 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -1,6 +1,8 @@ package dev.openfeature.contrib.providers.flagd; +import java.util.Collections; import java.util.List; +import java.util.Map; import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; @@ -24,6 +26,7 @@ public class FlagdProvider extends EventProvider { private final Resolver flagResolver; private volatile boolean initialized = false; private volatile boolean connected = false; + private volatile Map syncMetadata = Collections.emptyMap(); private EvaluationContext evaluationContext; @@ -47,13 +50,13 @@ public FlagdProvider(final FlagdOptions options) { switch (options.getResolverType().asString()) { case Config.RESOLVER_IN_PROCESS: this.flagResolver = new InProcessResolver(options, this::isConnected, - this::onResolverConnectionChanged); + this::onConnectionEvent); break; case Config.RESOLVER_RPC: this.flagResolver = new GrpcResolver(options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::isConnected, - this::onResolverConnectionChanged); + this::onConnectionEvent); break; default: throw new IllegalStateException( @@ -117,6 +120,19 @@ public ProviderEvaluation getObjectEvaluation(String key, Value defaultVa return this.flagResolver.objectEvaluation(key, defaultValue, mergeContext(ctx)); } + /** + * An unmodifiable view of an object map representing the latest result of the + * SyncMetadata. + * Set on initial connection and updated with every reconnection. + * see: + * https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata + * + * @return Object map representing sync metadata + */ + protected Map getSyncMetadata() { + return Collections.unmodifiableMap(syncMetadata); + } + private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) { if (this.evaluationContext != null) { return evaluationContext.merge(clientCallCtx); @@ -129,10 +145,12 @@ private boolean isConnected() { return this.connected; } - private void onResolverConnectionChanged(boolean newConnectedState, List changedFlagKeys) { + private void onConnectionEvent(boolean newConnectedState, List changedFlagKeys, + Map syncMetadata) { boolean previous = connected; boolean current = newConnectedState; this.connected = newConnectedState; + this.syncMetadata = syncMetadata; // configuration changed if (initialized && previous && current) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java index 8976a6789..45d82b66b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java @@ -5,7 +5,7 @@ import dev.openfeature.sdk.Value; /** - * A generic flag resolving contract for flagd. + * Abstraction that resolves flag values in from some source. */ public interface Resolver { void init() throws Exception; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java index ed279da8e..856cf0b63 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java @@ -20,12 +20,12 @@ @Slf4j @SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects") class EventStreamObserver implements StreamObserver { - private final BiConsumer> stateConsumer; + private final BiConsumer> onConnectionEvent; private final Object sync; private final Cache cache; - private static final String CONFIGURATION_CHANGE = "configuration_change"; - private static final String PROVIDER_READY = "provider_ready"; + public static final String CONFIGURATION_CHANGE = "configuration_change"; + public static final String PROVIDER_READY = "provider_ready"; static final String FLAGS_KEY = "flags"; /** @@ -33,12 +33,12 @@ class EventStreamObserver implements StreamObserver { * * @param sync synchronization object from caller * @param cache cache to update - * @param stateConsumer lambda to call for setting the state + * @param onResponse lambda to call to handle the response */ - EventStreamObserver(Object sync, Cache cache, BiConsumer> stateConsumer) { + EventStreamObserver(Object sync, Cache cache, BiConsumer> onResponse) { this.sync = sync; this.cache = cache; - this.stateConsumer = stateConsumer; + this.onConnectionEvent = onResponse; } @Override @@ -61,7 +61,7 @@ public void onError(Throwable t) { if (this.cache.getEnabled()) { this.cache.clear(); } - this.stateConsumer.accept(false, Collections.emptyList()); + this.onConnectionEvent.accept(false, Collections.emptyList()); // handle last call of this stream handleEndOfStream(); @@ -72,7 +72,7 @@ public void onCompleted() { if (this.cache.getEnabled()) { this.cache.clear(); } - this.stateConsumer.accept(false, Collections.emptyList()); + this.onConnectionEvent.accept(false, Collections.emptyList()); // handle last call of this stream handleEndOfStream(); @@ -99,11 +99,11 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { } } - this.stateConsumer.accept(true, changedFlags); + this.onConnectionEvent.accept(true, changedFlags); } private void handleProviderReadyEvent() { - this.stateConsumer.accept(true, Collections.emptyList()); + this.onConnectionEvent.accept(true, Collections.emptyList()); if (this.cache.getEnabled()) { this.cache.clear(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java index 563bad739..0ce3b05ef 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java @@ -2,9 +2,9 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import java.util.function.Supplier; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -14,6 +14,7 @@ import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest; import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; +import dev.openfeature.sdk.internal.TriConsumer; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; @@ -37,7 +38,7 @@ public class GrpcConnector { private final long deadline; private final Cache cache; - private final BiConsumer> stateConsumer; + private final TriConsumer, Map> onConnectionEvent; private final Supplier connectedSupplier; private int eventStreamAttempt = 1; @@ -48,23 +49,23 @@ public class GrpcConnector { /** * GrpcConnector creates an abstraction over gRPC communication. - * - * @param options options to build the gRPC channel. - * @param cache cache to use. - * @param stateConsumer lambda to call for setting the state. + * + * @param options flagd options + * @param cache cache to use + * @param connectedSupplier lambda providing current connection status from caller + * @param onConnectionEvent lambda which handles changes in the connection/stream */ public GrpcConnector(final FlagdOptions options, final Cache cache, final Supplier connectedSupplier, - BiConsumer> stateConsumer) { + TriConsumer, Map> onConnectionEvent) { this.channel = ChannelBuilder.nettyChannel(options); this.serviceStub = ServiceGrpc.newStub(channel); this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel); - this.maxEventStreamRetries = options.getMaxEventStreamRetries(); this.startEventStreamRetryBackoff = options.getRetryBackoffMs(); this.eventStreamRetryBackoff = options.getRetryBackoffMs(); this.deadline = options.getDeadline(); this.cache = cache; - this.stateConsumer = stateConsumer; + this.onConnectionEvent = onConnectionEvent; this.connectedSupplier = connectedSupplier; } @@ -104,7 +105,7 @@ public void shutdown() throws Exception { this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS); log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline)); } - this.stateConsumer.accept(false, Collections.emptyList()); + this.onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap()); } } @@ -124,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() { private void observeEventStream() { while (this.eventStreamAttempt <= this.maxEventStreamRetries) { final StreamObserver responseObserver = new EventStreamObserver(sync, this.cache, - this::grpcStateConsumer); + this::grpconConnectionEvent); this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); try { @@ -155,16 +156,16 @@ private void observeEventStream() { } log.error("failed to connect to event stream, exhausted retries"); - this.grpcStateConsumer(false, null); + this.grpconConnectionEvent(false, Collections.emptyList()); } - private void grpcStateConsumer(final boolean connected, final List changedFlags) { + private void grpconConnectionEvent(final boolean connected, final List changedFlags) { // reset reconnection states if (connected) { this.eventStreamAttempt = 1; this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff; } // chain to initiator - this.stateConsumer.accept(connected, changedFlags); + this.onConnectionEvent.accept(connected, changedFlags, Collections.emptyMap()); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java index 7226d7257..e1edc5247 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java @@ -3,7 +3,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -29,18 +28,21 @@ import dev.openfeature.sdk.ImmutableMetadata; import dev.openfeature.sdk.MutableStructure; import dev.openfeature.sdk.ProviderEvaluation; +import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; import dev.openfeature.sdk.exceptions.FlagNotFoundError; import dev.openfeature.sdk.exceptions.GeneralError; import dev.openfeature.sdk.exceptions.OpenFeatureError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; +import dev.openfeature.sdk.internal.TriConsumer; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; /** - * FlagResolution resolves flags from flagd. + * Resolves flag values using https://buf.build/open-feature/flagd/docs/main:flagd.evaluation.v1. + * Flags are evaluated remotely. */ @SuppressWarnings("PMD.TooManyStaticImports") @SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects") @@ -52,20 +54,20 @@ public final class GrpcResolver implements Resolver { private final Supplier connectedSupplier; /** - * Initialize Grpc resolver. - * - * @param options flagd options. - * @param cache cache to use. - * @param connectedSupplier lambda to call for getting the state. - * @param onResolverConnectionChanged lambda to communicate back the state. + * Resolves flag values using https://buf.build/open-feature/flagd/docs/main:flagd.evaluation.v1. + * Flags are evaluated remotely. + * + * @param options flagd options + * @param cache cache to use + * @param connectedSupplier lambda providing current connection status from caller + * @param onConnectionEvent lambda which handles changes in the connection/stream */ public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier connectedSupplier, - final BiConsumer> onResolverConnectionChanged) { + final TriConsumer, Map> onConnectionEvent) { this.cache = cache; this.connectedSupplier = connectedSupplier; - this.strategy = ResolveFactory.getStrategy(options); - this.connector = new GrpcConnector(options, cache, connectedSupplier, onResolverConnectionChanged); + this.connector = new GrpcConnector(options, cache, connectedSupplier, onConnectionEvent); } /** @@ -203,14 +205,14 @@ private Boolean cacheAvailable() { /** * Recursively convert protobuf structure to openfeature value. */ - private static Value convertObjectResponse(Struct protobuf) { + public static Value convertObjectResponse(Struct protobuf) { return convertProtobufMap(protobuf.getFieldsMap()); } /** * Recursively convert the Evaluation context to a protobuf structure. */ - private static Struct convertContext(EvaluationContext ctx) { + public static Struct convertContext(EvaluationContext ctx) { Map ctxMap = ctx.asMap(); // asMap() does not provide explicitly set targeting key (ex:- new // ImmutableContext("TargetingKey") ). @@ -223,7 +225,7 @@ private static Struct convertContext(EvaluationContext ctx) { /** * Convert any openfeature value to a protobuf value. */ - private static com.google.protobuf.Value convertAny(Value value) { + public static com.google.protobuf.Value convertAny(Value value) { if (value.isList()) { return convertList(value.asList()); } else if (value.isStructure()) { @@ -236,7 +238,7 @@ private static com.google.protobuf.Value convertAny(Value value) { /** * Convert any protobuf value to {@link Value}. */ - private static Value convertAny(com.google.protobuf.Value protobuf) { + public static Value convertAny(com.google.protobuf.Value protobuf) { if (protobuf.hasListValue()) { return convertList(protobuf.getListValue()); } else if (protobuf.hasStructValue()) { @@ -249,7 +251,7 @@ private static Value convertAny(com.google.protobuf.Value protobuf) { /** * Convert OpenFeature map to protobuf {@link com.google.protobuf.Value}. */ - private static com.google.protobuf.Value convertMap(Map map) { + public static com.google.protobuf.Value convertMap(Map map) { Map values = new HashMap<>(); map.keySet().forEach((String key) -> { @@ -265,20 +267,28 @@ private static com.google.protobuf.Value convertMap(Map map) { * Convert protobuf map with {@link com.google.protobuf.Value} to OpenFeature * map. */ - private static Value convertProtobufMap(Map map) { + public static Value convertProtobufMap(Map map) { + return new Value(convertProtobufMapToStructure(map)); + } + + /** + * Convert protobuf map with {@link com.google.protobuf.Value} to OpenFeature + * map. + */ + public static Structure convertProtobufMapToStructure(Map map) { Map values = new HashMap<>(); map.keySet().forEach((String key) -> { com.google.protobuf.Value value = map.get(key); values.put(key, convertAny(value)); }); - return new Value(new MutableStructure(values)); + return new MutableStructure(values); } /** * Convert OpenFeature list to protobuf {@link com.google.protobuf.Value}. */ - private static com.google.protobuf.Value convertList(List values) { + public static com.google.protobuf.Value convertList(List values) { ListValue list = ListValue.newBuilder() .addAllValues(values.stream() .map(v -> convertAny(v)).collect(Collectors.toList())) @@ -289,7 +299,7 @@ private static com.google.protobuf.Value convertList(List values) { /** * Convert protobuf list to OpenFeature {@link com.google.protobuf.Value}. */ - private static Value convertList(ListValue protobuf) { + public static Value convertList(ListValue protobuf) { return new Value(protobuf.getValuesList().stream().map(p -> convertAny(p)).collect(Collectors.toList())); } @@ -297,7 +307,7 @@ private static Value convertList(ListValue protobuf) { * Convert OpenFeature {@link Value} to protobuf * {@link com.google.protobuf.Value}. */ - private static com.google.protobuf.Value convertPrimitive(Value value) { + public static com.google.protobuf.Value convertPrimitive(Value value) { com.google.protobuf.Value.Builder builder = com.google.protobuf.Value.newBuilder(); if (value.isBoolean()) { @@ -316,7 +326,7 @@ private static com.google.protobuf.Value convertPrimitive(Value value) { * Convert protobuf {@link com.google.protobuf.Value} to OpenFeature * {@link Value}. */ - private static Value convertPrimitive(com.google.protobuf.Value protobuf) { + public static Value convertPrimitive(com.google.protobuf.Value protobuf) { final Value value; if (protobuf.hasBoolValue()) { value = new Value(protobuf.getBoolValue()); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index 06e641736..2affac522 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -2,8 +2,9 @@ import static dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag.EMPTY_TARGETING_STRING; +import java.util.Collections; import java.util.List; -import java.util.function.BiConsumer; +import java.util.Map; import java.util.function.Supplier; import dev.openfeature.contrib.providers.flagd.FlagdOptions; @@ -26,33 +27,39 @@ import dev.openfeature.sdk.Value; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; +import dev.openfeature.sdk.internal.TriConsumer; import lombok.extern.slf4j.Slf4j; /** - * flagd in-process resolver. Resolves feature flags in-process. Flags are - * retrieved from {@link Storage}, where the - * {@link Storage} maintain flag configurations obtained from known source. + * Resolves flag values using + * https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1. + * Flags are evaluated locally. */ @Slf4j public class InProcessResolver implements Resolver { private final Storage flagStore; - private final BiConsumer> onResolverConnectionChanged; + private final TriConsumer, Map> onConnectionEvent; private final Operator operator; private final long deadline; private final ImmutableMetadata metadata; private final Supplier connectedSupplier; /** - * Initialize an in-process resolver. - * @param options flagd options - * @param connectedSupplier supplier for connection state - * @param onResolverConnectionChanged handler for connection change + * Resolves flag values using + * https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1. + * Flags are evaluated locally. + * + * @param options flagd options + * @param connectedSupplier lambda providing current connection status from + * caller + * @param onConnectionEvent lambda which handles changes in the + * connection/stream */ public InProcessResolver(FlagdOptions options, final Supplier connectedSupplier, - BiConsumer> onResolverConnectionChanged) { + TriConsumer, Map> onConnectionEvent) { this.flagStore = new FlagStore(getConnector(options)); this.deadline = options.getDeadline(); - this.onResolverConnectionChanged = onResolverConnectionChanged; + this.onConnectionEvent = onConnectionEvent; this.operator = new Operator(); this.connectedSupplier = connectedSupplier; this.metadata = options.getSelector() == null ? null @@ -72,10 +79,11 @@ public void init() throws Exception { final StorageStateChange storageStateChange = flagStore.getStateQueue().take(); switch (storageStateChange.getStorageState()) { case OK: - onResolverConnectionChanged.accept(true, storageStateChange.getChangedFlagsKeys()); + onConnectionEvent.accept(true, storageStateChange.getChangedFlagsKeys(), + storageStateChange.getSyncMetadata()); break; case ERROR: - onResolverConnectionChanged.accept(false, null); + onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap()); break; default: log.info(String.format("Storage emitted unhandled status: %s", @@ -101,7 +109,7 @@ public void init() throws Exception { */ public void shutdown() throws InterruptedException { flagStore.shutdown(); - onResolverConnectionChanged.accept(false, null); + onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap()); } /** diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index 3b4c03113..fd32a3d97 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -3,7 +3,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; @@ -94,15 +94,15 @@ public BlockingQueue getStateQueue() { } private void streamerListener(final Connector connector) throws InterruptedException { - final BlockingQueue streamPayloads = connector.getStream(); + final BlockingQueue streamPayloads = connector.getStream(); while (!shutdown.get()) { - final StreamPayload take = streamPayloads.take(); + final QueuePayload take = streamPayloads.take(); switch (take.getType()) { case DATA: try { List changedFlagsKeys; - Map flagMap = FlagParser.parseString(take.getData(), throwIfInvalid); + Map flagMap = FlagParser.parseString(take.getFlagData(), throwIfInvalid); writeLock.lock(); try { changedFlagsKeys = getChangedFlagsKeys(flagMap); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java index cf85b0432..042bd082b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/StorageStateChange.java @@ -1,13 +1,13 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - /** * Represents a change in the stored flags. */ @@ -17,14 +17,39 @@ public class StorageStateChange { private final StorageState storageState; private final List changedFlagsKeys; + private final Map syncMetadata; + + /** + * Construct a new StorageStateChange. + * @param storageState state of the storage + * @param changedFlagsKeys flags changed + * @param syncMetadata possibly updated metadata + */ + public StorageStateChange(StorageState storageState, List changedFlagsKeys, + Map syncMetadata) { + this.storageState = storageState; + this.changedFlagsKeys = Collections.unmodifiableList(changedFlagsKeys); + this.syncMetadata = Collections.unmodifiableMap(syncMetadata); + } + /** + * Construct a new StorageStateChange. + * @param storageState state of the storage + * @param changedFlagsKeys flags changed + */ public StorageStateChange(StorageState storageState, List changedFlagsKeys) { this.storageState = storageState; - this.changedFlagsKeys = new ArrayList<>(changedFlagsKeys); + this.changedFlagsKeys = Collections.unmodifiableList(changedFlagsKeys); + this.syncMetadata = Collections.emptyMap(); } + /** + * Construct a new StorageStateChange. + * @param storageState state of the storage + */ public StorageStateChange(StorageState storageState) { this.storageState = storageState; this.changedFlagsKeys = Collections.emptyList(); + this.syncMetadata = Collections.emptyMap(); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java index 66ebf2c90..1a00737b5 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java @@ -4,12 +4,12 @@ /** * Contract of the in-process storage connector. Connectors are responsible to stream flag configurations in - * {@link StreamPayload} format. + * {@link QueuePayload} format. */ public interface Connector { void init() throws Exception; - BlockingQueue getStream(); + BlockingQueue getStream(); void shutdown() throws InterruptedException; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayload.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java similarity index 55% rename from providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayload.java rename to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java index f2afc34b5..59f1cf48c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayload.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java @@ -1,5 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; @@ -8,7 +9,8 @@ */ @AllArgsConstructor @Getter -public class StreamPayload { - private final StreamPayloadType type; - private final String data; +public class QueuePayload { + private final QueuePayloadType type; + private final String flagData; + private final Map syncMetadata; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayloadType.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java similarity index 83% rename from providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayloadType.java rename to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java index dad6ba80d..4839dab51 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/StreamPayloadType.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayloadType.java @@ -3,7 +3,7 @@ /** * Payload type emitted by {@link Connector}. */ -public enum StreamPayloadType { +public enum QueuePayloadType { DATA, ERROR } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java index 775b5a453..e955d863f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnector.java @@ -1,19 +1,20 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import lombok.extern.slf4j.Slf4j; + /** * File connector reads flag configurations from a given file, polls for changes and expose the content through * {@code Connector} contract. @@ -28,7 +29,7 @@ public class FileConnector implements Connector { private static final String OFFER_WARN = "Unable to offer file content to queue: queue is full"; private final String flagSourcePath; - private final BlockingQueue queue = new LinkedBlockingQueue<>(1); + private final BlockingQueue queue = new LinkedBlockingQueue<>(1); private boolean shutdown = false; public FileConnector(final String flagSourcePath) { @@ -45,7 +46,7 @@ public void init() throws IOException { // initial read String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8); - if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) { + if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) { log.warn(OFFER_WARN); } @@ -58,7 +59,7 @@ public void init() throws IOException { if (currentTS > lastTS) { lastTS = currentTS; flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8); - if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) { + if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) { log.warn(OFFER_WARN); } } @@ -72,7 +73,7 @@ public void init() throws IOException { Thread.currentThread().interrupt(); } catch (Throwable t) { log.error("Error from file connector. File connector will exit", t); - if (!queue.offer(new StreamPayload(StreamPayloadType.ERROR, t.toString()))) { + if (!queue.offer(new QueuePayload(QueuePayloadType.ERROR, t.toString(), null))) { log.warn(OFFER_WARN); } } @@ -86,7 +87,7 @@ public void init() throws IOException { /** * Expose the queue to fulfil the {@code Connector} contract. */ - public BlockingQueue getStream() { + public BlockingQueue getStream() { return queue; } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 4e28f8bdf..7e089111d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -1,24 +1,30 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc; +import java.util.Collections; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; +import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc; +import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub; import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub; +import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataRequest; +import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.ManagedChannel; import lombok.extern.slf4j.Slf4j; -import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * Implements the {@link Connector} contract and emit flags obtained from flagd * sync gRPC contract. @@ -35,10 +41,11 @@ public class GrpcStreamConnector implements Connector { private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final ManagedChannel channel; - private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub; + private final FlagSyncServiceStub serviceStub; + private final FlagSyncServiceBlockingStub serviceBlockingStub; private final int deadline; private final String selector; @@ -50,6 +57,7 @@ public class GrpcStreamConnector implements Connector { public GrpcStreamConnector(final FlagdOptions options) { channel = ChannelBuilder.nettyChannel(options); serviceStub = FlagSyncServiceGrpc.newStub(channel); + serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel); deadline = options.getDeadline(); selector = options.getSelector(); } @@ -60,13 +68,7 @@ public GrpcStreamConnector(final FlagdOptions options) { public void init() { Thread listener = new Thread(() -> { try { - final SyncFlagsRequest.Builder requestBuilder = SyncFlagsRequest.newBuilder(); - - if (selector != null) { - requestBuilder.setSelector(selector); - } - - observeEventStream(blockingQueue, shutdown, serviceStub, requestBuilder.build()); + observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector); } catch (InterruptedException e) { log.warn("gRPC event stream interrupted, flag configurations are stale", e); Thread.currentThread().interrupt(); @@ -80,7 +82,7 @@ public void init() { /** * Get blocking queue to obtain payloads exposed by this connector. */ - public BlockingQueue getStream() { + public BlockingQueue getStream() { return blockingQueue; } @@ -111,10 +113,11 @@ public void shutdown() throws InterruptedException { /** * Contains blocking calls, to be used concurrently. */ - static void observeEventStream(final BlockingQueue writeTo, + static void observeEventStream(final BlockingQueue writeTo, final AtomicBoolean shutdown, final FlagSyncServiceStub serviceStub, - final SyncFlagsRequest request) + final FlagSyncServiceBlockingStub serviceBlockingStub, + final String selector) throws InterruptedException { final BlockingQueue streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); @@ -123,8 +126,28 @@ static void observeEventStream(final BlockingQueue writeTo, log.info("Initializing sync stream observer"); while (!shutdown.get()) { + Exception metadataException = null; log.debug("Initializing sync stream request"); - serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver)); + final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder(); + final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder(); + Map metadata = Collections.emptyMap(); + + if (selector != null) { + syncRequest.setSelector(selector); + } + + serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); + try { + GetMetadataResponse metadataResponse = serviceBlockingStub.getMetadata(metadataRequest.build()); + metadata = GrpcResolver + .convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap(); + } catch (Exception e) { + // the chances this call fails but the syncRequest does not are slim + // it could be that the server doesn't implement this RPC + // instead of logging here, retain the exception and only log if the + // streamReceiver doesn't error + metadataException = e; + } while (!shutdown.get()) { final GrpcResponseModel response = streamReceiver.take(); @@ -141,7 +164,8 @@ static void observeEventStream(final BlockingQueue writeTo, response.getError()); if (!writeTo.offer( - new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) { + new QueuePayload(QueuePayloadType.ERROR, "Error from stream connection, retrying", + metadata))) { log.error("Failed to convey ERROR status, queue is full"); } break; @@ -150,11 +174,18 @@ static void observeEventStream(final BlockingQueue writeTo, final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse(); String data = flagsResponse.getFlagConfiguration(); log.debug("Got stream response: " + data); + if (!writeTo.offer( - new StreamPayload(StreamPayloadType.DATA, data))) { + new QueuePayload(QueuePayloadType.DATA, data, metadata))) { log.error("Stream writing failed"); } + if (metadataException != null) { + // if we somehow are connected but the metadata call failed, something strange + // happened + log.error("Stream connected but getMetadata RPC failed", metadataException); + } + // reset retry delay if we succeeded in a retry attempt retryDelay = INIT_BACK_OFF; } @@ -177,4 +208,5 @@ static void observeEventStream(final BlockingQueue writeTo, // log as this can happen after awakened from backoff sleep log.info("Shutdown invoked, exiting event stream listener"); } + } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index f1318ac4b..14b7779b8 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -20,6 +21,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,6 +33,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -60,13 +63,16 @@ import dev.openfeature.sdk.MutableContext; import dev.openfeature.sdk.MutableStructure; import dev.openfeature.sdk.OpenFeatureAPI; +import dev.openfeature.sdk.ProviderEvaluation; import dev.openfeature.sdk.ProviderState; import dev.openfeature.sdk.Reason; import dev.openfeature.sdk.Structure; import dev.openfeature.sdk.Value; +import dev.openfeature.sdk.internal.TriConsumer; import io.cucumber.java.AfterAll; import io.grpc.Channel; import io.grpc.Deadline; +import lombok.val; class FlagdProviderTest { private static final String FLAG_KEY = "some-key"; @@ -502,14 +508,14 @@ void invalidate_cache() { final Cache cache = new Cache("lru", 5); class NoopInitGrpcConnector extends GrpcConnector { - public NoopInitGrpcConnector(FlagdOptions options, Cache cache, Supplier connectedSupplier, BiConsumer> onResolverConnectionChanged) { - super(options, cache, connectedSupplier, onResolverConnectionChanged); + public NoopInitGrpcConnector(FlagdOptions options, Cache cache, Supplier connectedSupplier, TriConsumer, Map> onConnectionEvent) { + super(options, cache, connectedSupplier, onConnectionEvent); } public void initialize() throws Exception {}; } - grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> { + grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys, syncMetadata) -> { }); } @@ -719,15 +725,15 @@ void disabled_cache() { class NoopInitGrpcConnector extends GrpcConnector { public NoopInitGrpcConnector(FlagdOptions options, Cache cache, Supplier connectedSupplier, - BiConsumer> onResolverConnectionChanged) { - super(options, cache, connectedSupplier, onResolverConnectionChanged); + TriConsumer, Map> onConnectionEvent) { + super(options, cache, connectedSupplier, onConnectionEvent); } public void initialize() throws Exception { }; } - grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> { + grpc = new NoopInitGrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys, syncMetadata) -> { }); } @@ -845,6 +851,40 @@ void initializationAndShutdown() throws Exception { verify(resolverMock, times(1)).shutdown(); } + @Test + void updatesSyncMetadataWithCallback() throws Exception { + + final EvaluationContext ctx = new ImmutableContext(); + String key = "key1"; + String val = "val1"; + Map metadata = new HashMap<>(); + metadata.put(key, val); + + // mock a resolver + try (MockedConstruction mockResolver = mockConstruction(GrpcResolver.class, + (mock, context) -> { + TriConsumer, Map> onConnectionEvent; + + // get a reference to the onConnectionEvent callback + onConnectionEvent = (TriConsumer, Map>) context + .arguments().get(3); + + // when our mock resolver initializes, it runs the passed onConnectionEvent callback + doAnswer(invocation -> { + onConnectionEvent.accept(true, Collections.emptyList(), + metadata); + return null; + }).when(mock).init(); + })) { + + FlagdProvider provider = new FlagdProvider(); + provider.initialize(ctx); + + // the onConnectionEvent should have updated the sync metadata + assertEquals(val, provider.getSyncMetadata().get(key)); + } + } + // test helper // create provider with given grpc connector @@ -861,12 +901,12 @@ private FlagdProvider createProvider(GrpcConnector grpc, Supplier getCo // create provider with given grpc provider, cache and state supplier private FlagdProvider createProvider(GrpcConnector grpc, Cache cache, Supplier getConnected) { - final FlagdOptions flagdOptions = FlagdOptions.builder().build(); - final GrpcResolver grpcResolver = new GrpcResolver(flagdOptions, cache, getConnected, - (providerState, changedFlagKeys) -> { - }); + final FlagdOptions flagdOptions = FlagdOptions.builder().build(); + final GrpcResolver grpcResolver = new GrpcResolver(flagdOptions, cache, getConnected, + (providerState, changedFlagKeys, syncMetadata) -> { + }); - final FlagdProvider provider = new FlagdProvider(); + final FlagdProvider provider = new FlagdProvider(); try { Field connector = GrpcResolver.class.getDeclaredField("connector"); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java index 3325341ed..29051abe9 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -18,6 +19,8 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -27,12 +30,15 @@ import org.junit.jupiter.params.provider.ValueSource; import org.mockito.MockedConstruction; import org.mockito.MockedStatic; +import org.mockito.invocation.InvocationOnMock; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; +import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceBlockingStub; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc.ServiceStub; +import dev.openfeature.sdk.internal.TriConsumer; import io.grpc.Channel; import io.grpc.netty.NettyChannelBuilder; import io.netty.channel.EventLoopGroup; @@ -43,7 +49,7 @@ public class GrpcConnectorTest { @ParameterizedTest - @ValueSource(ints = {1, 2, 3}) + @ValueSource(ints = { 1, 2, 3 }) void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAccessException { final int backoffMs = 100; @@ -58,8 +64,9 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); - final GrpcConnector connector = new GrpcConnector(options, cache, () -> true, (state,changedFlagKeys) -> { - }); + final GrpcConnector connector = new GrpcConnector(options, cache, () -> true, + (state, changedFlagKeys, syncMetadata) -> { + }); Field serviceStubField = GrpcConnector.class.getDeclaredField("serviceStub"); serviceStubField.setAccessible(true); @@ -90,29 +97,52 @@ void validate_retry_calls(int retries) throws NoSuchFieldException, IllegalAcces @Test void initialization_succeed_with_connected_status() throws NoSuchFieldException, IllegalAccessException { final Cache cache = new Cache("disabled", 0); - final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); - doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); + TriConsumer, Map> onConnectionEvent = mock(TriConsumer.class); + doAnswer((InvocationOnMock invocation) -> { + EventStreamObserver eventStreamObserver = (EventStreamObserver) invocation.getArgument(1); + eventStreamObserver + .onNext(EventStreamResponse.newBuilder().setType(EventStreamObserver.PROVIDER_READY).build()); + return null; + }).when(mockStub).eventStream(any(), any()); - // pass true in connected lambda - final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> true, (state, changedFlagKeys) -> { - }); + try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { + mockStaticService.when(() -> ServiceGrpc.newStub(any())) + .thenReturn(mockStub); - assertDoesNotThrow(connector::initialize); + // pass true in connected lambda + final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> { + try { + Thread.sleep(100); + return true; + } catch (Exception e) { + } + return false; + + }, + onConnectionEvent); + + assertDoesNotThrow(connector::initialize); + + // assert that onConnectionEvent was called with true + verify(onConnectionEvent).accept(argThat(arg -> arg), any(), any()); + } } @Test void initialization_fail_with_timeout() throws Exception { final Cache cache = new Cache("disabled", 0); - final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); + TriConsumer, Map> onConnectionEvent = mock(TriConsumer.class); doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); - // pass false in connected lambda - final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false, (state, changedFlagKeys) -> { - }); + final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false, + onConnectionEvent); + // assert throws assertThrows(RuntimeException.class, connector::initialize); + // assert that onConnectionEvent was called with false + verify(onConnectionEvent).accept(argThat(arg -> !arg), any(), any()); } @Test @@ -170,17 +200,16 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception { new GrpcConnector(FlagdOptions.builder().build(), null, null, null); // verify host/port matches & called times(= 1 as we rely on reusable channel) - mockStaticChannelBuilder.verify(() -> NettyChannelBuilder. - forAddress(host, port), times(1)); + mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.forAddress(host, port), times(1)); } } }); } - /** - * OS Specific test - This test is valid only on Linux system as it rely on epoll availability - * */ + * OS Specific test - This test is valid only on Linux system as it rely on + * epoll availability + */ @Test @EnabledOnOs(OS.LINUX) void path_arg_should_build_domain_socket_with_correct_path() { @@ -218,8 +247,9 @@ void path_arg_should_build_domain_socket_with_correct_path() { } /** - * OS Specific test - This test is valid only on Linux system as it rely on epoll availability - * */ + * OS Specific test - This test is valid only on Linux system as it rely on + * epoll availability + */ @Test @EnabledOnOs(OS.LINUX) void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception { @@ -249,7 +279,7 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex new GrpcConnector(FlagdOptions.builder().build(), null, null, null); - //verify path matches & called times(= 1 as we rely on reusable channel) + // verify path matches & called times(= 1 as we rely on reusable channel) mockStaticChannelBuilder.verify(() -> NettyChannelBuilder .forAddress(argThat((DomainSocketAddress d) -> { return d.path() == path; diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index f3c2a5773..dfa2051f9 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -19,6 +19,8 @@ import dev.openfeature.sdk.exceptions.FlagNotFoundError; import dev.openfeature.sdk.exceptions.ParseError; import dev.openfeature.sdk.exceptions.TypeMismatchError; +import dev.openfeature.sdk.internal.TriConsumer; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -51,397 +53,404 @@ class InProcessResolverTest { - @Test - public void connectorSetup() { - // given - FlagdOptions forGrpcOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) - .host("localhost") - .port(8080).build(); - FlagdOptions forOfflineOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) - .offlineFlagSourcePath("path").build(); - FlagdOptions forCustomConnectorOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) - .customConnector(new MockConnector(null)).build(); - - // then - assertInstanceOf(GrpcStreamConnector.class, InProcessResolver.getConnector(forGrpcOptions)); - assertInstanceOf(FileConnector.class, InProcessResolver.getConnector(forOfflineOptions)); - assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions)); + @Test + public void connectorSetup() { + // given + FlagdOptions forGrpcOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) + .host("localhost") + .port(8080).build(); + FlagdOptions forOfflineOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) + .offlineFlagSourcePath("path").build(); + FlagdOptions forCustomConnectorOptions = FlagdOptions.builder().resolverType(Config.Resolver.IN_PROCESS) + .customConnector(new MockConnector(null)).build(); + + // then + assertInstanceOf(GrpcStreamConnector.class, InProcessResolver.getConnector(forGrpcOptions)); + assertInstanceOf(FileConnector.class, InProcessResolver.getConnector(forOfflineOptions)); + assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions)); + } + + @Test + public void eventHandling() throws Throwable { + // given + // note - queues with adequate capacity + final BlockingQueue sender = new LinkedBlockingQueue<>(5); + final BlockingQueue receiver = new LinkedBlockingQueue<>(5); + final String key = "key1"; + final String val = "val1"; + final Map syncMetadata = new HashMap<>(); + syncMetadata.put(key, val); + + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender), + (connectedState, changedFlagKeys, sm) -> receiver.offer(new StorageStateChange( + connectedState ? StorageState.OK : StorageState.ERROR, changedFlagKeys, sm))); + + // when - init and emit events + Thread initThread = new Thread(() -> { + try { + inProcessResolver.init(); + } catch (Exception e) { + } + }); + initThread.start(); + if (!sender.offer(new StorageStateChange(StorageState.OK, Collections.emptyList(), syncMetadata), 100, + TimeUnit.MILLISECONDS)) { + Assertions.fail("failed to send the event"); + } + if (!sender.offer(new StorageStateChange(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { + Assertions.fail("failed to send the event"); } - @Test - public void eventHandling() throws Throwable { - // given - // note - queues with adequate capacity - final BlockingQueue sender = new LinkedBlockingQueue<>(5); - final BlockingQueue receiver = new LinkedBlockingQueue<>(5); - - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(new HashMap<>(), sender), - (connectedState, changedFlagKeys) -> receiver.offer(connectedState)); - - // when - init and emit events - Thread initThread = new Thread(() -> { - try { - inProcessResolver.init(); - } catch (Exception e) { - } - }); - initThread.start(); - if (!sender.offer(new StorageStateChange(StorageState.OK, Collections.EMPTY_LIST), 100, - TimeUnit.MILLISECONDS)) { - Assertions.fail("failed to send the event"); - } - if (!sender.offer(new StorageStateChange(StorageState.ERROR), 100, TimeUnit.MILLISECONDS)) { - Assertions.fail("failed to send the event"); - } - - // then - receive events in order - assertTimeoutPreemptively(Duration.ofMillis(200), () -> { - Assertions.assertTrue(receiver.take()); + // then - receive events in order + assertTimeoutPreemptively(Duration.ofMillis(200), () -> { + StorageStateChange storageState = receiver.take(); + assertEquals(StorageState.OK, storageState.getStorageState()); + assertEquals(val, storageState.getSyncMetadata().get(key)); + }); + + assertTimeoutPreemptively(Duration.ofMillis(200), () -> { + assertEquals(StorageState.ERROR, receiver.take().getStorageState()); + }); + } + + @Test + public void simpleBooleanResolving() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("booleanFlag", BOOLEAN_FLAG); + + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { }); - assertTimeoutPreemptively(Duration.ofMillis(200), () -> { - Assertions.assertFalse(receiver.take()); + // when + ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", + false, + new ImmutableContext()); + + // then + assertEquals(true, providerEvaluation.getValue()); + assertEquals("on", providerEvaluation.getVariant()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + } + + @Test + public void simpleDoubleResolving() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("doubleFlag", DOUBLE_FLAG); + + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { }); - } - - @Test - public void simpleBooleanResolving() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("booleanFlag", BOOLEAN_FLAG); - - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); - - // when - ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", - false, - new ImmutableContext()); - - // then - assertEquals(true, providerEvaluation.getValue()); - assertEquals("on", providerEvaluation.getVariant()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - } - - @Test - public void simpleDoubleResolving() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("doubleFlag", DOUBLE_FLAG); - - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("doubleFlag", 0d, - new ImmutableContext()); - - // then - assertEquals(3.141d, providerEvaluation.getValue()); - assertEquals("one", providerEvaluation.getVariant()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - } + // when + ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("doubleFlag", 0d, + new ImmutableContext()); - @Test - public void fetchIntegerAsDouble() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("doubleFlag", DOUBLE_FLAG); + // then + assertEquals(3.141d, providerEvaluation.getValue()); + assertEquals("one", providerEvaluation.getVariant()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void fetchIntegerAsDouble() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("doubleFlag", DOUBLE_FLAG); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("doubleFlag", 0, - new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // then - assertEquals(3, providerEvaluation.getValue()); - assertEquals("one", providerEvaluation.getVariant()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - } + // when + ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("doubleFlag", 0, + new ImmutableContext()); - @Test - public void fetchDoubleAsInt() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("integerFlag", INT_FLAG); + // then + assertEquals(3, providerEvaluation.getValue()); + assertEquals("one", providerEvaluation.getVariant()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void fetchDoubleAsInt() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("integerFlag", INT_FLAG); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("integerFlag", 0d, - new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // then - assertEquals(1d, providerEvaluation.getValue()); - assertEquals("one", providerEvaluation.getVariant()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - } + // when + ProviderEvaluation providerEvaluation = inProcessResolver.doubleEvaluation("integerFlag", 0d, + new ImmutableContext()); - @Test - public void simpleIntResolving() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("integerFlag", INT_FLAG); + // then + assertEquals(1d, providerEvaluation.getValue()); + assertEquals("one", providerEvaluation.getVariant()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void simpleIntResolving() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("integerFlag", INT_FLAG); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("integerFlag", 0, - new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // then - assertEquals(1, providerEvaluation.getValue()); - assertEquals("one", providerEvaluation.getVariant()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - } + // when + ProviderEvaluation providerEvaluation = inProcessResolver.integerEvaluation("integerFlag", 0, + new ImmutableContext()); - @Test - public void simpleObjectResolving() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("objectFlag", OBJECT_FLAG); + // then + assertEquals(1, providerEvaluation.getValue()); + assertEquals("one", providerEvaluation.getVariant()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void simpleObjectResolving() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("objectFlag", OBJECT_FLAG); - Map typeDefault = new HashMap<>(); - typeDefault.put("key", "0164"); - typeDefault.put("date", "01.01.1990"); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.objectEvaluation("objectFlag", - Value.objectToValue(typeDefault), new ImmutableContext()); + Map typeDefault = new HashMap<>(); + typeDefault.put("key", "0164"); + typeDefault.put("date", "01.01.1990"); - // then - Value value = providerEvaluation.getValue(); - Map valueMap = value.asStructure().asMap(); + // when + ProviderEvaluation providerEvaluation = inProcessResolver.objectEvaluation("objectFlag", + Value.objectToValue(typeDefault), new ImmutableContext()); - assertEquals("0165", valueMap.get("key").asString()); - assertEquals("01.01.2000", valueMap.get("date").asString()); - assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); - assertEquals("typeA", providerEvaluation.getVariant()); - } + // then + Value value = providerEvaluation.getValue(); + Map valueMap = value.asStructure().asMap(); - @Test - public void missingFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); + assertEquals("0165", valueMap.get("key").asString()); + assertEquals("01.01.2000", valueMap.get("date").asString()); + assertEquals(Reason.STATIC.toString(), providerEvaluation.getReason()); + assertEquals("typeA", providerEvaluation.getVariant()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void missingFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); - // when/then - ProviderEvaluation missingFlag = inProcessResolver.booleanEvaluation("missingFlag", false, - new ImmutableContext()); - assertEquals(ErrorCode.FLAG_NOT_FOUND, missingFlag.getErrorCode()); - } + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - @Test - public void disabledFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("disabledFlag", DISABLED_FLAG); + // when/then + ProviderEvaluation missingFlag = inProcessResolver.booleanEvaluation("missingFlag", false, + new ImmutableContext()); + assertEquals(ErrorCode.FLAG_NOT_FOUND, missingFlag.getErrorCode()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void disabledFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("disabledFlag", DISABLED_FLAG); - // when/then - ProviderEvaluation disabledFlag = inProcessResolver.booleanEvaluation("disabledFlag", false, - new ImmutableContext()); - assertEquals(ErrorCode.FLAG_NOT_FOUND, disabledFlag.getErrorCode()); - } + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - @Test - public void variantMismatchFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); + // when/then + ProviderEvaluation disabledFlag = inProcessResolver.booleanEvaluation("disabledFlag", false, + new ImmutableContext()); + assertEquals(ErrorCode.FLAG_NOT_FOUND, disabledFlag.getErrorCode()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void variantMismatchFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("mismatchFlag", VARIANT_MISMATCH_FLAG); - // when/then - assertThrows(TypeMismatchError.class, () -> { - inProcessResolver.booleanEvaluation("mismatchFlag", false, new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { }); - } - @Test - public void typeMismatchEvaluation() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("stringFlag", BOOLEAN_FLAG); + // when/then + assertThrows(TypeMismatchError.class, () -> { + inProcessResolver.booleanEvaluation("mismatchFlag", false, new ImmutableContext()); + }); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void typeMismatchEvaluation() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("stringFlag", BOOLEAN_FLAG); - // when/then - assertThrows(TypeMismatchError.class, () -> { - inProcessResolver.stringEvaluation("stringFlag", "false", new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { }); - } - @Test - public void booleanShorthandEvaluation() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); + // when/then + assertThrows(TypeMismatchError.class, () -> { + inProcessResolver.stringEvaluation("stringFlag", "false", new ImmutableContext()); + }); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void booleanShorthandEvaluation() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("shorthand", FLAG_WIH_SHORTHAND_TARGETING); - ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, - new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // then - assertEquals(true, providerEvaluation.getValue()); - assertEquals("true", providerEvaluation.getVariant()); - assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); - } + ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("shorthand", false, + new ImmutableContext()); - @Test - public void targetingMatchedEvaluationFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); - - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); - - // when - ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", - "loopAlg", - new MutableContext().add("email", "abc@faas.com")); - - // then - assertEquals("binetAlg", providerEvaluation.getValue()); - assertEquals("binet", providerEvaluation.getVariant()); - assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); - } + // then + assertEquals(true, providerEvaluation.getValue()); + assertEquals("true", providerEvaluation.getVariant()); + assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); + } - @Test - public void targetingUnmatchedEvaluationFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); - - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); - - // when - ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", - "loopAlg", - new MutableContext().add("email", "abc@abc.com")); - - // then - assertEquals("loopAlg", providerEvaluation.getValue()); - assertEquals("loop", providerEvaluation.getVariant()); - assertEquals(Reason.DEFAULT.toString(), providerEvaluation.getReason()); - } + @Test + public void targetingMatchedEvaluationFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); - @Test - public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalAccessException { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + // when + ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", + "loopAlg", + new MutableContext().add("email", "abc@faas.com")); + + // then + assertEquals("binetAlg", providerEvaluation.getValue()); + assertEquals("binet", providerEvaluation.getVariant()); + assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); + } + + @Test + public void targetingUnmatchedEvaluationFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("stringFlag", FLAG_WIH_IF_IN_TARGET); + + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // when - ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loop", - new MutableContext("xyz")); + // when + ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", + "loopAlg", + new MutableContext().add("email", "abc@abc.com")); + + // then + assertEquals("loopAlg", providerEvaluation.getValue()); + assertEquals("loop", providerEvaluation.getVariant()); + assertEquals(Reason.DEFAULT.toString(), providerEvaluation.getReason()); + } + + @Test + public void explicitTargetingKeyHandling() throws NoSuchFieldException, IllegalAccessException { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("stringFlag", FLAG_WITH_TARGETING_KEY); + + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { + }); - // then - assertEquals("binetAlg", providerEvaluation.getValue()); - assertEquals("binet", providerEvaluation.getVariant()); - assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); - } + // when + ProviderEvaluation providerEvaluation = inProcessResolver.stringEvaluation("stringFlag", "loop", + new MutableContext("xyz")); - @Test - public void targetingErrorEvaluationFlag() throws Exception { - // given - final Map flagMap = new HashMap<>(); - flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); + // then + assertEquals("binetAlg", providerEvaluation.getValue()); + assertEquals("binet", providerEvaluation.getVariant()); + assertEquals(Reason.TARGETING_MATCH.toString(), providerEvaluation.getReason()); + } - InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), - (providerState, changedFlagKeys) -> { - }); + @Test + public void targetingErrorEvaluationFlag() throws Exception { + // given + final Map flagMap = new HashMap<>(); + flagMap.put("targetingErrorFlag", FLAG_WIH_INVALID_TARGET); - // when/then - assertThrows(ParseError.class, () -> { - inProcessResolver.booleanEvaluation("targetingErrorFlag", false, new ImmutableContext()); + InProcessResolver inProcessResolver = getInProcessResolverWth(new MockStorage(flagMap), + (providerState, changedFlagKeys, syncMetadata) -> { }); - } - - @Test - public void validateMetadataInEvaluationResult() throws Exception { - // given - final String scope = "appName=myApp"; - final Map flagMap = new HashMap<>(); - flagMap.put("booleanFlag", BOOLEAN_FLAG); - - InProcessResolver inProcessResolver = getInProcessResolverWth( - FlagdOptions.builder().selector(scope).build(), - new MockStorage(flagMap)); - - // when - ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", - false, - new ImmutableContext()); - - // then - final ImmutableMetadata metadata = providerEvaluation.getFlagMetadata(); - assertNotNull(metadata); - assertEquals(scope, metadata.getString("scope")); - } - private InProcessResolver getInProcessResolverWth(final FlagdOptions options, final MockStorage storage) - throws NoSuchFieldException, IllegalAccessException { + // when/then + assertThrows(ParseError.class, () -> { + inProcessResolver.booleanEvaluation("targetingErrorFlag", false, new ImmutableContext()); + }); + } + + @Test + public void validateMetadataInEvaluationResult() throws Exception { + // given + final String scope = "appName=myApp"; + final Map flagMap = new HashMap<>(); + flagMap.put("booleanFlag", BOOLEAN_FLAG); + + InProcessResolver inProcessResolver = getInProcessResolverWth( + FlagdOptions.builder().selector(scope).build(), + new MockStorage(flagMap)); + + // when + ProviderEvaluation providerEvaluation = inProcessResolver.booleanEvaluation("booleanFlag", + false, + new ImmutableContext()); + + // then + final ImmutableMetadata metadata = providerEvaluation.getFlagMetadata(); + assertNotNull(metadata); + assertEquals(scope, metadata.getString("scope")); + } + + private InProcessResolver getInProcessResolverWth(final FlagdOptions options, final MockStorage storage) + throws NoSuchFieldException, IllegalAccessException { + + final InProcessResolver resolver = new InProcessResolver(options, () -> true, + (providerState, changedFlagKeys, syncMetadata) -> { + }); + return injectFlagStore(resolver, storage); + } - final InProcessResolver resolver = new InProcessResolver(options, () -> true, - (providerState, changedFlagKeys) -> { - }); - return injectFlagStore(resolver, storage); - } + private InProcessResolver getInProcessResolverWth(final MockStorage storage, + final TriConsumer, Map> onConnectionEvent) + throws NoSuchFieldException, IllegalAccessException { - private InProcessResolver getInProcessResolverWth(final MockStorage storage, - final BiConsumer> stateConsumer) - throws NoSuchFieldException, IllegalAccessException { + final InProcessResolver resolver = new InProcessResolver( + FlagdOptions.builder().deadline(1000).build(), () -> true, onConnectionEvent); + return injectFlagStore(resolver, storage); + } - final InProcessResolver resolver = new InProcessResolver( - FlagdOptions.builder().deadline(1000).build(), () -> true, stateConsumer); - return injectFlagStore(resolver, storage); - } + // helper to inject flagStore override + private InProcessResolver injectFlagStore(final InProcessResolver resolver, final MockStorage storage) + throws NoSuchFieldException, IllegalAccessException { - // helper to inject flagStore override - private InProcessResolver injectFlagStore(final InProcessResolver resolver, final MockStorage storage) - throws NoSuchFieldException, IllegalAccessException { + final Field flagStore = InProcessResolver.class.getDeclaredField("flagStore"); + flagStore.setAccessible(true); + flagStore.set(resolver, storage); - final Field flagStore = InProcessResolver.class.getDeclaredField("flagStore"); - flagStore.setAccessible(true); - flagStore.set(resolver, storage); - - return resolver; - } + return resolver; + } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index 3e69d48c5..44e806d48 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -2,14 +2,15 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import org.junit.Assert; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.HashSet; import java.util.Map; +import java.util.Collections; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -27,7 +28,7 @@ class FlagStoreTest { public void connectorHandling() throws Exception { final int maxDelay = 500; - final BlockingQueue payload = new LinkedBlockingQueue<>(); + final BlockingQueue payload = new LinkedBlockingQueue<>(); FlagStore store = new FlagStore(new MockConnector(payload), true); store.init(); @@ -35,7 +36,7 @@ public void connectorHandling() throws Exception { // OK for simple flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE))); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE), Collections.emptyMap())); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -44,7 +45,7 @@ public void connectorHandling() throws Exception { // STALE for invalid flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(INVALID_FLAG))); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG), Collections.emptyMap())); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -53,7 +54,7 @@ public void connectorHandling() throws Exception { // OK again for next payload assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_LONG))); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), Collections.emptyMap())); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -62,7 +63,7 @@ public void connectorHandling() throws Exception { // ERROR is propagated correctly assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.ERROR, null)); + payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, Collections.emptyMap())); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { @@ -80,13 +81,13 @@ public void connectorHandling() throws Exception { @Test public void changedFlags() throws Exception { final int maxDelay = 500; - final BlockingQueue payload = new LinkedBlockingQueue<>(); + final BlockingQueue payload = new LinkedBlockingQueue<>(); FlagStore store = new FlagStore(new MockConnector(payload), true); store.init(); final BlockingQueue storageStateDTOS = store.getStateQueue(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_SIMPLE))); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE), Collections.emptyMap())); }); // flags changed for first time assertEquals(FlagParser.parseString( @@ -94,7 +95,7 @@ public void changedFlags() throws Exception { storageStateDTOS.take().getChangedFlagsKeys()); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), ()-> { - payload.offer(new StreamPayload(StreamPayloadType.DATA, getFlagsFromResource(VALID_LONG))); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), Collections.emptyMap())); }); Map expectedChangedFlags = FlagParser.parseString(getFlagsFromResource(VALID_LONG),true); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java index 495b69778..be1429475 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java @@ -1,18 +1,19 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BlockingQueue; +import java.util.Collections; @Slf4j public class MockConnector implements Connector { - private BlockingQueue mockQueue; + private BlockingQueue mockQueue; - public MockConnector(final BlockingQueue mockQueue) { + public MockConnector(final BlockingQueue mockQueue) { this.mockQueue = mockQueue; } @@ -20,13 +21,13 @@ public void init() { // no-op } - public BlockingQueue getStream() { + public BlockingQueue getStream() { return mockQueue; } public void shutdown() { // Emit error mocking closed connection scenario - if (!mockQueue.offer(new StreamPayload(StreamPayloadType.ERROR, "shutdown invoked"))) { + if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked", Collections.emptyMap()))) { log.warn("Failed to offer shutdown status"); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java index 0f3b2f379..a2ad795c4 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/file/FileConnectorTest.java @@ -1,7 +1,7 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -31,16 +31,16 @@ void readAndExposeFeatureFlagsFromSource() throws IOException { connector.init(); // then - final BlockingQueue stream = connector.getStream(); - final StreamPayload[] payload = new StreamPayload[1]; + final BlockingQueue stream = connector.getStream(); + final QueuePayload[] payload = new QueuePayload[1]; assertNotNull(stream); assertTimeoutPreemptively(Duration.ofMillis(200), () -> { payload[0] = stream.take(); }); - assertNotNull(payload[0].getData()); - assertEquals(StreamPayloadType.DATA, payload[0].getType()); + assertNotNull(payload[0].getFlagData()); + assertEquals(QueuePayloadType.DATA, payload[0].getType()); } @Test @@ -52,16 +52,16 @@ void emitErrorStateForInvalidPath() throws IOException { connector.init(); // then - final BlockingQueue stream = connector.getStream(); + final BlockingQueue stream = connector.getStream(); // Must emit an error within considerable time - final StreamPayload[] payload = new StreamPayload[1]; + final QueuePayload[] payload = new QueuePayload[1]; assertTimeoutPreemptively(Duration.ofMillis(200), () -> { payload[0] = stream.take(); }); - assertNotNull(payload[0].getData()); - assertEquals(StreamPayloadType.ERROR, payload[0].getType()); + assertNotNull(payload[0].getFlagData()); + assertEquals(QueuePayloadType.ERROR, payload[0].getType()); } @Test @@ -80,15 +80,15 @@ void watchForFileUpdatesAndEmitThem() throws IOException { connector.init(); // then - final BlockingQueue stream = connector.getStream(); - final StreamPayload[] payload = new StreamPayload[1]; + final BlockingQueue stream = connector.getStream(); + final QueuePayload[] payload = new QueuePayload[1]; // first validate the initial payload assertTimeoutPreemptively(Duration.ofMillis(200), () -> { payload[0] = stream.take(); }); - assertEquals(initial, payload[0].getData()); + assertEquals(initial, payload[0].getFlagData()); // then update the flags Files.write(updPath, updatedFlags.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); @@ -98,7 +98,7 @@ void watchForFileUpdatesAndEmitThem() throws IOException { payload[0] = stream.take(); }); - assertEquals(updatedFlags, payload[0].getData()); + assertEquals(updatedFlags, payload[0].getFlagData()); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java index 10232f1c1..083915a26 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java @@ -1,12 +1,15 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc; +import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.lang.reflect.Field; import java.time.Duration; @@ -16,10 +19,14 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import com.google.protobuf.Struct; + import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; +import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub; import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub; +import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; @@ -57,8 +64,19 @@ public void connectionParameters() throws Throwable { @Test public void grpcConnectionStatus() throws Throwable { // given + final String key = "key1"; + final String val = "value1"; final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build()); final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); + final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); + +final Struct metadata = Struct.newBuilder() + .putFields(key, + com.google.protobuf.Value.newBuilder().setStringValue(val).build()) + .build(); + + + when(blockingStubMock.getMetadata(any())).thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build()); final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; @@ -71,12 +89,13 @@ public void grpcConnectionStatus() throws Throwable { connector.init(); // verify and wait for initialization verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(blockingStubMock).getMetadata(any()); // then final GrpcStreamHandler grpcStreamHandler = injectedHandler[0]; assertNotNull(grpcStreamHandler); - final BlockingQueue streamPayloads = connector.getStream(); + final BlockingQueue streamPayloads = connector.getStream(); // accepted data grpcStreamHandler.onNext( @@ -84,8 +103,9 @@ public void grpcConnectionStatus() throws Throwable { .build()); assertTimeoutPreemptively(MAX_WAIT_MS, () -> { - StreamPayload payload = streamPayloads.take(); - assertEquals(StreamPayloadType.DATA, payload.getType()); + QueuePayload payload = streamPayloads.take(); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertTrue(() -> payload.getSyncMetadata().get(key).equals(val)); }); // ping must be ignored @@ -99,8 +119,8 @@ public void grpcConnectionStatus() throws Throwable { .build()); assertTimeoutPreemptively(MAX_WAIT_MS, () -> { - StreamPayload payload = streamPayloads.take(); - assertEquals(StreamPayloadType.DATA, payload.getType()); + QueuePayload payload = streamPayloads.take(); + assertEquals(QueuePayloadType.DATA, payload.getType()); }); } @@ -109,6 +129,7 @@ public void listenerExitOnShutdown() throws Throwable { // given final GrpcStreamConnector connector = new GrpcStreamConnector(FlagdOptions.builder().build()); final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); + final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; @@ -121,6 +142,7 @@ public void listenerExitOnShutdown() throws Throwable { connector.init(); // verify and wait for initialization verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(blockingStubMock).getMetadata(any()); // then final GrpcStreamHandler grpcStreamHandler = injectedHandler[0]; @@ -132,8 +154,8 @@ public void listenerExitOnShutdown() throws Throwable { grpcStreamHandler.onError(new Exception("Channel closed, exiting")); assertTimeoutPreemptively(MAX_WAIT_MS, () -> { - StreamPayload payload = connector.getStream().take(); - assertEquals(StreamPayloadType.ERROR, payload.getType()); + QueuePayload payload = connector.getStream().take(); + assertEquals(QueuePayloadType.ERROR, payload.getType()); }); // Validate mock calls & no more event propagation @@ -161,4 +183,16 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c return stubMock; } + private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector) + throws Throwable { + final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub"); + blockingStubField.setAccessible(true); + + final FlagSyncServiceBlockingStub blockingStubMock = Mockito.mock(FlagSyncServiceBlockingStub.class); + + blockingStubField.set(connector, blockingStubMock); + + return blockingStubMock; + } + }