Skip to content

Commit

Permalink
fixup: fixing nit, and adding information for fallthrough
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Jan 16, 2025
1 parent 853727f commit 338f367
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class FlagdProvider extends EventProvider {
/**
* A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
*/
private ScheduledFuture<?> reconnectTask;
private ScheduledFuture<?> errorTask;

/**
* The grace period in milliseconds to wait after {@link ProviderEvent#PROVIDER_STALE} before emitting a
Expand Down Expand Up @@ -114,7 +114,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
// block till ready - this works with deadline fine for rpc, but with in_process we also need to take parsing
// into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number - follow up
Util.busyWaitAndCheck(this.deadline + 500, () -> initialized);
Util.busyWaitAndCheck(this.deadline + 200, () -> initialized);
}

@Override
Expand Down Expand Up @@ -188,7 +188,7 @@ EvaluationContext getEnrichedContext() {
}

@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
private synchronized void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
Expand Down Expand Up @@ -236,8 +236,8 @@ private void onReady() {
initialized = true;
log.info("initialized FlagdProvider");
}
if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
Expand All @@ -251,19 +251,22 @@ private void onError() {
.message("there has been an error")
.build());

if (reconnectTask != null && !reconnectTask.isCancelled()) {
reconnectTask.cancel(false);
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}

if (!errorExecutor.isShutdown()) {
reconnectTask = errorExecutor.schedule(
errorTask = errorExecutor.schedule(
() -> {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if(previousEvent == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
},
gracePeriod,
TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.grpc.Context.CancellableContext;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -121,6 +120,7 @@ void observeEventStream(final BlockingQueue<QueuePayload> writeTo, final AtomicB
metadataException = e;
}

log.info("stream");
while (!shutdown.get()) {
final GrpcResponseModel response = streamReceiver.take();
if (response.isComplete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
@Suite
@IncludeEngines("cucumber")
@SelectDirectories("test-harness/gherkin")
// if you want to run just one feature file, use the following line instead of @SelectDirectories
// @SelectFile("test-harness/gherkin/connection.feature")
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
@Suite
@IncludeEngines("cucumber")
@SelectDirectories("test-harness/gherkin")
// if you want to run just one feature file, use the following line instead of @SelectDirectories
// @SelectFile("test-harness/gherkin/rpc-caching.feature")
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
Expand Down

0 comments on commit 338f367

Please sign in to comment.