Skip to content

Commit

Permalink
fixup: fixing rpc mode - in-process still buggy on reconnect
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Jan 14, 2025
1 parent 0fe3c4f commit 82db146
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 85 deletions.
6 changes: 5 additions & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ public boolean isConnected() {
return this.connected == ConnectionState.CONNECTED;
}

/**
* Indicates
* whether the current connection state is disconnected.
*
* @return {@code true} if disconnected, otherwise {@code false}.
*/
public boolean isDisconnected() {
return this.connected == ConnectionState.DISCONNECTED;
}
/**
* Indicates
* whether the current connection state is stale.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public GrpcConnector(
ManagedChannel channel) {

this.channel = channel;
this.serviceStub = stub.apply(channel);
this.blockingStub = blockingStub.apply(channel);
this.serviceStub = stub.apply(channel).withWaitForReady();
this.blockingStub = blockingStub.apply(channel).withWaitForReady();
this.deadline = options.getDeadline();
this.streamDeadlineMs = options.getStreamDeadlineMs();
this.onConnectionEvent = onConnectionEvent;
Expand Down Expand Up @@ -190,7 +190,6 @@ private synchronized void onReady() {
log.debug("Reconnection task cancelled as connection became READY.");
}
restartStream();
this.onConnectionEvent.accept(new ConnectionEvent(true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import com.google.protobuf.Value;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -10,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -20,25 +22,18 @@
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
class EventStreamObserver implements StreamObserver<EventStreamResponse> {

/**
* A consumer to handle connection events with a flag indicating success and a list of changed flags.
*/
private final BiConsumer<Boolean, List<String>> onConnectionEvent;

/**
* The cache to update based on received events.
*/
private final Cache cache;
private final Consumer<List<String>> onConfigurationChange;
private final Consumer<ConnectionEvent> onReady;

/**
* Constructs a new {@code EventStreamObserver} instance.
*
* @param cache the cache to update based on received events
* @param onConnectionEvent a consumer to handle connection events with a boolean and a list of changed flags
*/
EventStreamObserver(Cache cache, BiConsumer<Boolean, List<String>> onConnectionEvent) {
this.cache = cache;
this.onConnectionEvent = onConnectionEvent;
EventStreamObserver(Consumer<List<String>> onConfigurationChange, Consumer<ConnectionEvent> onReady) {
this.onConfigurationChange = onConfigurationChange;
this.onReady = onReady;
}

/**
Expand All @@ -60,27 +55,14 @@ public void onNext(EventStreamResponse value) {
}
}

/**
* Called when an error occurs in the stream.
*
* @param throwable the error that occurred
*/
@Override
public void onError(Throwable throwable) {
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
this.cache.clear();
}

}

/**
* Called when the stream is completed.
*/
@Override
public void onCompleted() {
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
this.cache.clear();
}
this.onConnectionEvent.accept(false, Collections.emptyList());

}

/**
Expand All @@ -90,33 +72,22 @@ public void onCompleted() {
*/
private void handleConfigurationChangeEvent(EventStreamResponse value) {
List<String> changedFlags = new ArrayList<>();
boolean cachingEnabled = this.cache.getEnabled();

Map<String, Value> data = value.getData().getFieldsMap();
Value flagsValue = data.get(Constants.FLAGS_KEY);
if (flagsValue == null) {
if (cachingEnabled) {
this.cache.clear();
}
} else {
if (flagsValue != null) {
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
for (String flagKey : flags.keySet()) {
changedFlags.add(flagKey);
if (cachingEnabled) {
this.cache.remove(flagKey);
}
}
changedFlags.addAll(flags.keySet());
}

this.onConnectionEvent.accept(true, changedFlags);
onConfigurationChange.accept(changedFlags);
}

/**
* Handles provider readiness events by clearing the cache (if enabled) and notifying listeners of readiness.
*/
private void handleProviderReadyEvent() {
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
this.cache.clear();
}
log.info("Received provider ready event");
onReady.accept(new ConnectionEvent(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,23 @@ public GrpcResolver(
options,
ServiceGrpc::newStub,
ServiceGrpc::newBlockingStub,
onConnectionEvent,
(event) -> {
if( cache != null && event.isDisconnected()) {
cache.clear();
}
onConnectionEvent.accept(event);
},
stub -> stub.eventStream(
Evaluation.EventStreamRequest.getDefaultInstance(),
new EventStreamObserver(
cache,
(k, e) ->
onConnectionEvent.accept(new ConnectionEvent(ConnectionState.CONNECTED, e)))));
(flags) -> {
if( cache != null) {
flags.forEach(cache::remove);
}
onConnectionEvent.accept(new ConnectionEvent(ConnectionState.CONNECTED, flags));
},
onConnectionEvent
)));
}

/**
Expand Down Expand Up @@ -207,7 +217,7 @@ private <T> Boolean isEvaluationCacheable(ProviderEvaluation<T> evaluation) {
}

private Boolean cacheAvailable() {
return this.cache.getEnabled() && this.connector.isConnected();
return this.cache.getEnabled();
}

private static ImmutableMetadata metadataFromResponse(Message response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
@IncludeEngines("cucumber")
@SelectFile("test-harness/gherkin/config.feature")
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps.config")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
public class RunConfigCucumberTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
@IncludeTags("in-process")
@ExcludeTags({"unixsocket", "customCert"})
@ExcludeTags({"unixsocket", "customCert", "targetURI"})
@Testcontainers
public class RunInProcessTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.platform.suite.api.ExcludeTags;
import org.junit.platform.suite.api.IncludeEngines;
import org.junit.platform.suite.api.IncludeTags;
import org.junit.platform.suite.api.SelectDirectories;
import org.junit.platform.suite.api.SelectFile;
import org.junit.platform.suite.api.Suite;
import org.testcontainers.junit.jupiter.Testcontainers;
Expand All @@ -21,13 +22,13 @@
@Order(value = Integer.MAX_VALUE)
@Suite
@IncludeEngines("cucumber")
// @SelectDirectories("test-harness/gherkin")
@SelectFile("test-harness/gherkin/connection.feature")
@SelectDirectories("test-harness/gherkin")
//@SelectFile("test-harness/gherkin/rpc-caching.feature")
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
@IncludeTags({"rpc", "reconnect"})
@ExcludeTags({"targetURI", "customCert", "unixsocket"})
@IncludeTags({"rpc"})
@ExcludeTags({ "unixsocket", "targetURI"})
@Testcontainers
public class RunRpcTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.cucumber.java.en.Given;
import io.cucumber.java.en.Then;
import io.cucumber.java.en.When;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void we_initialize_a_config_for(String string) {

@Given("an option {string} of type {string} with value {string}")
public void we_have_an_option_of_type_with_value(String option, String type, String value)
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
throws Throwable {
if (IGNORED_FOR_NOW.contains(option)) {
LOG.error("option '{}' is not supported", option);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dev.openfeature.contrib.providers.flagd.e2e.steps;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import dev.openfeature.contrib.providers.flagd.e2e.State;
import dev.openfeature.sdk.FlagEvaluationDetails;
Expand All @@ -18,12 +20,12 @@ public FlagSteps(State state) {
}

@Given("a {}-flag with key {string} and a default value {string}")
public void givenAFlag(String type, String name, String defaultValue) throws ClassNotFoundException {
public void givenAFlag(String type, String name, String defaultValue) throws Throwable {
state.flag = new Flag(type, name, Utils.convert(defaultValue, type));
}

@When("the flag was evaluated with details")
public void the_flag_was_evaluated_with_details() {
public void the_flag_was_evaluated_with_details() throws InterruptedException {
FlagEvaluationDetails details;
switch (state.flag.type) {
case "String":
Expand Down Expand Up @@ -52,8 +54,8 @@ public void the_flag_was_evaluated_with_details() {
state.evaluation = details;
}

@Then("the resolved details value should be {string}")
public void the_resolved_details_value_should_be(String value) throws ClassNotFoundException {
@Then("the resolved details value should be \"{}\"")
public void the_resolved_details_value_should_be(String value) throws Throwable {
assertThat(state.evaluation.getValue()).isEqualTo(Utils.convert(value, state.flag.type));
}

Expand All @@ -66,13 +68,14 @@ public void the_reason_should_be(String reason) {
public void the_variant_should_be(String variant) {
assertThat(state.evaluation.getVariant()).isEqualTo(variant);
}

@Then("the flag should be part of the event payload")
@Then("the flag was modified")
public void the_flag_was_modified() {
assertThat(state.lastEvent).isPresent().hasValueSatisfying((event) -> {
assertThat(event.type).isEqualTo("change");
assertThat(event.details.getFlagsChanged()).contains(state.flag.name);
});
await().atMost(5000, MILLISECONDS)
.until(() -> state.events.stream().anyMatch(event -> event.type.equals("change") && event.details.getFlagsChanged().contains(state.flag.name)));
state.lastEvent = state.events.stream()
.filter(event -> event.type.equals("change") && event.details.getFlagsChanged().contains(state.flag.name))
.findFirst();
}

public class Flag {
Expand Down
Loading

0 comments on commit 82db146

Please sign in to comment.