From 338f36709662ebdd93bb3f9b4e73d9cc1c9856df Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Wed, 15 Jan 2025 21:07:24 +0100 Subject: [PATCH] fixup: fixing nit, and adding information for fallthrough Signed-off-by: Simon Schrottner --- .../providers/flagd/FlagdProvider.java | 31 ++++++++++--------- .../connector/grpc/GrpcStreamConnector.java | 2 +- .../providers/flagd/e2e/RunInProcessTest.java | 1 + .../providers/flagd/e2e/RunRpcTest.java | 1 + 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 7f517a7c1..94acbac1a 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -51,7 +51,7 @@ public class FlagdProvider extends EventProvider { /** * A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}. */ - private ScheduledFuture reconnectTask; + private ScheduledFuture errorTask; /** * The grace period in milliseconds to wait after {@link ProviderEvent#PROVIDER_STALE} before emitting a @@ -114,7 +114,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws // block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing // into the equation // TODO: evaluate where we are losing time, so we can remove this magic number - follow up - Util.busyWaitAndCheck(this.deadline + 500, () -> initialized); + Util.busyWaitAndCheck(this.deadline + 200, () -> initialized); } @Override @@ -188,7 +188,7 @@ EvaluationContext getEnrichedContext() { } @SuppressWarnings("checkstyle:fallthrough") - private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { + private synchronized void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { syncMetadata = flagdProviderEvent.getSyncMetadata(); if (flagdProviderEvent.getSyncMetadata() != null) { @@ -236,8 +236,8 @@ private void onReady() { initialized = true; log.info("initialized FlagdProvider"); } - if (reconnectTask != null && !reconnectTask.isCancelled()) { - reconnectTask.cancel(false); + if (errorTask != null && !errorTask.isCancelled()) { + errorTask.cancel(false); log.debug("Reconnection task cancelled as connection became READY."); } this.emitProviderReady( @@ -251,19 +251,22 @@ private void onError() { .message("there has been an error") .build()); - if (reconnectTask != null && !reconnectTask.isCancelled()) { - reconnectTask.cancel(false); + if (errorTask != null && !errorTask.isCancelled()) { + errorTask.cancel(false); } if (!errorExecutor.isShutdown()) { - reconnectTask = errorExecutor.schedule( + errorTask = errorExecutor.schedule( () -> { - log.debug( - "Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod); - flagResolver.onError(); - this.emitProviderError(ProviderEventDetails.builder() - .message("there has been an error") - .build()); + if(previousEvent == ProviderEvent.PROVIDER_ERROR) { + log.debug( + "Provider did not reconnect successfully within {}s. Emit ERROR event...", + gracePeriod); + flagResolver.onError(); + this.emitProviderError(ProviderEventDetails.builder() + .message("there has been an error") + .build()); + } }, gracePeriod, TimeUnit.SECONDS); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 7e7637095..18a2300d9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -16,7 +16,6 @@ import io.grpc.Context.CancellableContext; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; @@ -121,6 +120,7 @@ void observeEventStream(final BlockingQueue writeTo, final AtomicB metadataException = e; } + log.info("stream"); while (!shutdown.get()) { final GrpcResponseModel response = streamReceiver.take(); if (response.isComplete()) { diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java index abaa2595a..bc1367d96 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java @@ -22,6 +22,7 @@ @Suite @IncludeEngines("cucumber") @SelectDirectories("test-harness/gherkin") +// if you want to run just one feature file, use the following line instead of @SelectDirectories // @SelectFile("test-harness/gherkin/connection.feature") @ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty") @ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps") diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunRpcTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunRpcTest.java index bcf6d016a..bc649ddeb 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunRpcTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunRpcTest.java @@ -22,6 +22,7 @@ @Suite @IncludeEngines("cucumber") @SelectDirectories("test-harness/gherkin") +// if you want to run just one feature file, use the following line instead of @SelectDirectories // @SelectFile("test-harness/gherkin/rpc-caching.feature") @ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty") @ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")