From a14931fd7907915face2992de3e6deeefd1092e8 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 16 Jan 2025 19:57:39 +0100 Subject: [PATCH] fixup: fixing synchronization in a strange way Signed-off-by: Simon Schrottner --- .gitmodules | 2 +- .../providers/flagd/FlagdProvider.java | 65 ++++++++++--------- .../connector/grpc/GrpcStreamConnector.java | 4 +- .../providers/flagd/e2e/RunInProcessTest.java | 2 +- providers/flagd/test-harness | 2 +- 5 files changed, 39 insertions(+), 36 deletions(-) diff --git a/.gitmodules b/.gitmodules index b302279b9..619d4fff1 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,7 +4,7 @@ [submodule "providers/flagd/test-harness"] path = providers/flagd/test-harness url = https://github.com/open-feature/test-harness.git - branch = v1.1.0 + branch = v1.1.1 [submodule "providers/flagd/spec"] path = providers/flagd/spec url = https://github.com/open-feature/spec.git diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 94acbac1a..c01c33e91 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -41,6 +41,7 @@ public class FlagdProvider extends EventProvider { private volatile EvaluationContext enrichedContext = new ImmutableContext(); private final List hooks = new ArrayList<>(); private volatile ProviderEvent previousEvent = null; + private final Object eventLock; /** * An executor service responsible for emitting {@link ProviderEvent#PROVIDER_ERROR} after the provider went @@ -97,6 +98,7 @@ public FlagdProvider(final FlagdOptions options) { this.errorExecutor = Executors.newSingleThreadScheduledExecutor(); this.gracePeriod = options.getRetryGracePeriod(); this.deadline = options.getDeadline(); + this.eventLock = new Object(); } @Override @@ -188,39 +190,42 @@ EvaluationContext getEnrichedContext() { } @SuppressWarnings("checkstyle:fallthrough") - private synchronized void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { + private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - syncMetadata = flagdProviderEvent.getSyncMetadata(); - if (flagdProviderEvent.getSyncMetadata() != null) { - enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata()); - } + synchronized (eventLock) { + log.info("FlagdProviderEvent: {}", flagdProviderEvent); + syncMetadata = flagdProviderEvent.getSyncMetadata(); + if (flagdProviderEvent.getSyncMetadata() != null) { + enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata()); + } - /* - We only use Error and Ready as previous states. - As error will first be emitted as Stale, and only turns after a while into an emitted Error. - Ready is needed, as the InProcessResolver does not have a dedicated ready event, hence we need to - forward a configuration changed to the ready, if we are not in the ready state. - */ - switch (flagdProviderEvent.getEvent()) { - case PROVIDER_CONFIGURATION_CHANGED: - if (previousEvent == ProviderEvent.PROVIDER_READY) { - onConfigurationChanged(flagdProviderEvent); + /* + We only use Error and Ready as previous states. + As error will first be emitted as Stale, and only turns after a while into an emitted Error. + Ready is needed, as the InProcessResolver does not have a dedicated ready event, hence we need to + forward a configuration changed to the ready, if we are not in the ready state. + */ + switch (flagdProviderEvent.getEvent()) { + case PROVIDER_CONFIGURATION_CHANGED: + if (previousEvent == ProviderEvent.PROVIDER_READY) { + onConfigurationChanged(flagdProviderEvent); + break; + } + // intentional fall through, a not-ready change will trigger a ready. + case PROVIDER_READY: + onReady(); + previousEvent = ProviderEvent.PROVIDER_READY; break; - } - // intentional fall through, a not-ready change will trigger a ready. - case PROVIDER_READY: - onReady(); - previousEvent = ProviderEvent.PROVIDER_READY; - break; - case PROVIDER_ERROR: - if (previousEvent != ProviderEvent.PROVIDER_ERROR) { - onError(); - } - previousEvent = ProviderEvent.PROVIDER_ERROR; - break; - default: - log.info("Unknown event {}", flagdProviderEvent.getEvent()); + case PROVIDER_ERROR: + if (previousEvent != ProviderEvent.PROVIDER_ERROR) { + onError(); + } + previousEvent = ProviderEvent.PROVIDER_ERROR; + break; + default: + log.info("Unknown event {}", flagdProviderEvent.getEvent()); + } } } @@ -258,7 +263,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if(previousEvent == ProviderEvent.PROVIDER_ERROR) { + if (previousEvent == ProviderEvent.PROVIDER_ERROR) { log.debug( "Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 18a2300d9..832c8a487 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -109,9 +109,7 @@ void observeEventStream(final BlockingQueue writeTo, final AtomicB try (CancellableContext context = Context.current().withCancellation()) { try { - metadataResponse = grpcConnector - .getResolver() - .getMetadata(metadataRequest.build()); + metadataResponse = grpcConnector.getResolver().getMetadata(metadataRequest.build()); } catch (Exception e) { // the chances this call fails but the syncRequest does not are slim // it could be that the server doesn't implement this RPC diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java index bc1367d96..e0edef240 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java @@ -28,7 +28,7 @@ @ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps") @ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory") @IncludeTags("in-process") -@ExcludeTags({"unixsocket", "customCert", "targetURI"}) +@ExcludeTags({"unixsocket", "targetURI"}) @Testcontainers public class RunInProcessTest { diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index dd6b67067..fc7867922 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit dd6b67067a3c146f5b7dd414637905f7ec369901 +Subproject commit fc786792273b7984911dc3bcb7b47489f261ba57