Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flagd): migrate file to own provider type #1173

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/flagd/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tmp/
41 changes: 21 additions & 20 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ The value is updated with every (re)connection to the sync implementation.
This can be used to enrich evaluations with such data.
If the `in-process` mode is not used, and before the provider is ready, the `getSyncMetadata` returns an empty map.

#### Offline mode
### Offline mode (File resolver)

In-process resolvers can also work in an offline mode.
To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.

```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.resolverType(Config.Resolver.FILE)
.offlineFlagSourcePath("PATH")
.build());
```
Expand Down Expand Up @@ -103,24 +103,25 @@ variables.

Given below are the supported configurations:

| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| --------------------- | ------------------------------ | ------------------------ | --------- | ------------------- |
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
| host | FLAGD_HOST | String | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 | rpc & in-process |
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | in-process |
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
|-----------------------|--------------------------------|--------------------------|-----------|-------------------------|
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
| host | FLAGD_HOST | String | localhost | rpc & in-process |
| port | FLAGD_PORT | int | 8013 | rpc & in-process |
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file |
| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file |

> [!NOTE]
> Some configurations are only applicable for RPC resolver.
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/schemas
2 changes: 1 addition & 1 deletion providers/flagd/spec
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class Config {
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final int DEFAULT_OFFLINE_POLL_MS = 5000;
static final long DEFAULT_KEEP_ALIVE = 0;

static final String RESOLVER_ENV_VAR = "FLAGD_RESOLVER";
Expand All @@ -33,13 +34,15 @@ public final class Config {
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String OFFLINE_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH";
static final String OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI";
static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD";

static final String RESOLVER_RPC = "rpc";
static final String RESOLVER_IN_PROCESS = "in-process";
static final String RESOLVER_FILE = "file";

public static final String STATIC_REASON = "STATIC";
public static final String CACHED_REASON = "CACHED";
Expand Down Expand Up @@ -87,6 +90,8 @@ static Resolver fromValueProvider(Function<String, String> provider) {
return Resolver.IN_PROCESS;
case "rpc":
return Resolver.RPC;
case "file":
return Resolver.FILE;
default:
log.warn("Unsupported resolver variable: {}", resolverVar);
return DEFAULT_RESOLVER_TYPE;
Expand Down Expand Up @@ -143,6 +148,11 @@ public String asString() {
public String asString() {
return RESOLVER_IN_PROCESS;
}
},
FILE {
public String asString() {
return RESOLVER_FILE;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.function.Function;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;

/**
* FlagdOptions is a builder to build flagd provider options.
Expand Down Expand Up @@ -119,8 +120,14 @@ public class FlagdOptions {
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
private String offlineFlagSourcePath;

/**
* File polling interval.
* Defaults to 0 (disabled).
**/
@Builder.Default
private String offlineFlagSourcePath = fallBackToEnvOrDefault(Config.OFFLINE_SOURCE_PATH, null);
private int offlinePollIntervalMs = fallBackToEnvOrDefault(Config.OFFLINE_POLL_MS, Config.DEFAULT_OFFLINE_POLL_MS);

/**
* gRPC custom target string.
Expand Down Expand Up @@ -193,7 +200,20 @@ void prebuild() {
resolverType = fromValueProvider(System::getenv);
}

if (port == 0) {
if (StringUtils.isBlank(offlineFlagSourcePath)) {
offlineFlagSourcePath = fallBackToEnvOrDefault(Config.OFFLINE_SOURCE_PATH, null);
}

if (!StringUtils.isEmpty(offlineFlagSourcePath) && resolverType == Config.Resolver.IN_PROCESS) {
resolverType = Config.Resolver.FILE;
}

// We need a file path for FILE Provider
if (StringUtils.isEmpty(offlineFlagSourcePath) && resolverType == Config.Resolver.FILE) {
throw new IllegalArgumentException("Resolver Type 'FILE' requires a offlineFlagSourcePath");
}

if (port == 0 && resolverType != Config.Resolver.FILE) {
port = Integer.parseInt(
fallBackToEnvOrDefault(Config.PORT_ENV_VAR_NAME, determineDefaultPortForResolver()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public FlagdProvider() {
*/
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_FILE:
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public GrpcResolver(
Evaluation.EventStreamRequest.getDefaultInstance(),
new EventStreamObserver(
(flags) -> {
if (cache != null) {
flags.forEach(cache::remove);
if (this.cache != null) {
flags.forEach(this.cache::remove);
}
onProviderEvent.accept(new FlagdProviderEvent(
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, flags));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ static Connector getConnector(final FlagdOptions options, Consumer<FlagdProvider
}
return options.getOfflineFlagSourcePath() != null
&& !options.getOfflineFlagSourcePath().isEmpty()
? new FileConnector(options.getOfflineFlagSourcePath())
? new FileConnector(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs())
: new GrpcStreamConnector(options, onConnectionEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
@Slf4j
public class FileConnector implements Connector {

private static final int POLL_INTERVAL_MS = 5000;
private static final String OFFER_WARN = "Unable to offer file content to queue: queue is full";

private final String flagSourcePath;
private final int pollInterval;
private final BlockingQueue<QueuePayload> queue = new LinkedBlockingQueue<>(1);
private boolean shutdown = false;

public FileConnector(final String flagSourcePath) {
public FileConnector(final String flagSourcePath, int pollInterval) {
this.flagSourcePath = flagSourcePath;
this.pollInterval = pollInterval;
}

/**
Expand Down Expand Up @@ -64,7 +65,7 @@ public void init() throws IOException {
}
}

Thread.sleep(POLL_INTERVAL_MS);
Thread.sleep(pollInterval);
}

log.info("Shutting down file connector.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ void TestBuilderOptions() {
.cacheType("lru")
.maxCacheSize(100)
.selector("app=weatherApp")
.offlineFlagSourcePath("some-path")
.openTelemetry(openTelemetry)
.customConnector(connector)
.resolverType(Resolver.IN_PROCESS)
Expand All @@ -76,7 +75,6 @@ void TestBuilderOptions() {
assertEquals("lru", flagdOptions.getCacheType());
assertEquals(100, flagdOptions.getMaxCacheSize());
assertEquals("app=weatherApp", flagdOptions.getSelector());
assertEquals("some-path", flagdOptions.getOfflineFlagSourcePath());
assertEquals(openTelemetry, flagdOptions.getOpenTelemetry());
assertEquals(connector, flagdOptions.getCustomConnector());
assertEquals(Resolver.IN_PROCESS, flagdOptions.getResolverType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package dev.openfeature.contrib.providers.flagd.e2e;

import static io.cucumber.junit.platform.engine.Constants.GLUE_PROPERTY_NAME;
import static io.cucumber.junit.platform.engine.Constants.OBJECT_FACTORY_PROPERTY_NAME;
import static io.cucumber.junit.platform.engine.Constants.PLUGIN_PROPERTY_NAME;

import dev.openfeature.contrib.providers.flagd.Config;
import org.apache.logging.log4j.core.config.Order;
import org.junit.platform.suite.api.BeforeSuite;
import org.junit.platform.suite.api.ConfigurationParameter;
import org.junit.platform.suite.api.ExcludeTags;
import org.junit.platform.suite.api.IncludeEngines;
import org.junit.platform.suite.api.IncludeTags;
import org.junit.platform.suite.api.SelectDirectories;
import org.junit.platform.suite.api.Suite;
import org.testcontainers.junit.jupiter.Testcontainers;

/**
* Class for running the reconnection tests for the RPC provider
*/
@Order(value = Integer.MAX_VALUE)
@Suite
@IncludeEngines("cucumber")
@SelectDirectories("test-harness/gherkin")
// if you want to run just one feature file, use the following line instead of @SelectDirectories
// @SelectFile("test-harness/gherkin/connection.feature")
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
@IncludeTags("file")
@ExcludeTags({"unixsocket", "targetURI", "reconnect", "customCert"})
@Testcontainers
public class RunFileTest {

@BeforeSuite
public static void before() {
State.resolverType = Config.Resolver.FILE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ public class State {
public FlagdOptions options;
public FlagdOptions.FlagdOptionsBuilder builder = FlagdOptions.builder();
public static Config.Resolver resolverType;
public boolean hasError;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import static io.restassured.RestAssured.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import dev.openfeature.contrib.providers.flagd.Config;
import dev.openfeature.contrib.providers.flagd.FlagdProvider;
import dev.openfeature.contrib.providers.flagd.e2e.FlagdContainer;
Expand All @@ -21,7 +19,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.parallel.Isolated;
Expand All @@ -47,7 +44,7 @@ public static void beforeAll() throws IOException {
sharedTempDir = Files.createDirectories(
Paths.get("tmp/" + RandomStringUtils.randomAlphanumeric(8).toLowerCase() + "/"));
container = new FlagdContainer()
.withFileSystemBind(sharedTempDir.toAbsolutePath().toString(), "/tmp", BindMode.READ_WRITE);
.withFileSystemBind(sharedTempDir.toAbsolutePath().toString(), "/flags", BindMode.READ_WRITE);
}

@AfterAll
Expand Down Expand Up @@ -78,10 +75,15 @@ public void setupProvider(String providerType) throws IOException, InterruptedEx
String flagdConfig = "default";
state.builder.deadline(1000).keepAlive(0).retryGracePeriod(2);
boolean wait = true;

switch (providerType) {
case "unavailable":
this.state.providerType = ProviderType.SOCKET;
state.builder.port(UNAVAILABLE_PORT);
if (State.resolverType == Config.Resolver.FILE) {

state.builder.offlineFlagSourcePath("not-existing");
}
wait = false;
break;
case "socket":
Expand All @@ -103,34 +105,28 @@ public void setupProvider(String providerType) throws IOException, InterruptedEx
.certPath(absolutePath);
flagdConfig = "ssl";
break;
case "offline":
State.resolverType = Config.Resolver.IN_PROCESS;
File flags = new File("test-harness/flags");
ObjectMapper objectMapper = new ObjectMapper();
Object merged = new Object();
for (File listFile : Objects.requireNonNull(flags.listFiles())) {
ObjectReader updater = objectMapper.readerForUpdating(merged);
merged = updater.readValue(listFile, Object.class);
}
Path offlinePath = Files.createTempFile("flags", ".json");
objectMapper.writeValue(offlinePath.toFile(), merged);

state.builder
.port(UNAVAILABLE_PORT)
.offlineFlagSourcePath(offlinePath.toAbsolutePath().toString());
break;

default:
this.state.providerType = ProviderType.DEFAULT;
state.builder.port(container.getPort(State.resolverType));
if (State.resolverType == Config.Resolver.FILE) {

state.builder
.port(UNAVAILABLE_PORT)
.offlineFlagSourcePath(sharedTempDir
.resolve("allFlags.json")
.toAbsolutePath()
.toString());
} else {
state.builder.port(container.getPort(State.resolverType));
}
break;
}
aepfli marked this conversation as resolved.
Show resolved Hide resolved
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
.then()
.statusCode(200);

// giving flagd a little time to start
Thread.sleep(100);
Thread.sleep(30);
FeatureProvider provider =
new FlagdProvider(state.builder.resolverType(State.resolverType).build());

Expand Down Expand Up @@ -159,10 +155,9 @@ public void the_connection_is_lost_for(int seconds) throws InterruptedException

@When("the flag was modified")
public void the_flag_was_modded() throws InterruptedException {

when().post("http://" + container.getLaunchpadUrl() + "/change").then().statusCode(200);

// we might be too fast in the execution
Thread.sleep(100);
Thread.sleep(1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public static Object convert(String value, String type) throws ClassNotFoundExce
return Config.Resolver.IN_PROCESS;
case "rpc":
return Config.Resolver.RPC;
case "file":
return Config.Resolver.FILE;
default:
throw new RuntimeException("Unknown resolver type: " + value);
}
Expand Down
Loading