Skip to content

Commit

Permalink
fixup: fixing synchronization in a strange way
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <[email protected]>
  • Loading branch information
aepfli committed Jan 17, 2025
1 parent 338f367 commit a14931f
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[submodule "providers/flagd/test-harness"]
path = providers/flagd/test-harness
url = https://github.com/open-feature/test-harness.git
branch = v1.1.0
branch = v1.1.1
[submodule "providers/flagd/spec"]
path = providers/flagd/spec
url = https://github.com/open-feature/spec.git
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class FlagdProvider extends EventProvider {
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private volatile ProviderEvent previousEvent = null;
private final Object eventLock;

/**
* An executor service responsible for emitting {@link ProviderEvent#PROVIDER_ERROR} after the provider went
Expand Down Expand Up @@ -97,6 +98,7 @@ public FlagdProvider(final FlagdOptions options) {
this.errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.gracePeriod = options.getRetryGracePeriod();
this.deadline = options.getDeadline();
this.eventLock = new Object();
}

@Override
Expand Down Expand Up @@ -188,39 +190,42 @@ EvaluationContext getEnrichedContext() {
}

@SuppressWarnings("checkstyle:fallthrough")
private synchronized void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}
synchronized (eventLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent);
syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

/*
We only use Error and Ready as previous states.
As error will first be emitted as Stale, and only turns after a while into an emitted Error.
Ready is needed, as the InProcessResolver does not have a dedicated ready event, hence we need to
forward a configuration changed to the ready, if we are not in the ready state.
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (previousEvent == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
/*
We only use Error and Ready as previous states.
As error will first be emitted as Stale, and only turns after a while into an emitted Error.
Ready is needed, as the InProcessResolver does not have a dedicated ready event, hence we need to
forward a configuration changed to the ready, if we are not in the ready state.
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (previousEvent == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
previousEvent = ProviderEvent.PROVIDER_READY;
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
previousEvent = ProviderEvent.PROVIDER_READY;
break;

case PROVIDER_ERROR:
if (previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
case PROVIDER_ERROR:
if (previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

Expand Down Expand Up @@ -258,7 +263,7 @@ private void onError() {
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if(previousEvent == ProviderEvent.PROVIDER_ERROR) {
if (previousEvent == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ void observeEventStream(final BlockingQueue<QueuePayload> writeTo, final AtomicB
try (CancellableContext context = Context.current().withCancellation()) {

try {
metadataResponse = grpcConnector
.getResolver()
.getMetadata(metadataRequest.build());
metadataResponse = grpcConnector.getResolver().getMetadata(metadataRequest.build());
} catch (Exception e) {
// the chances this call fails but the syncRequest does not are slim
// it could be that the server doesn't implement this RPC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
@IncludeTags("in-process")
@ExcludeTags({"unixsocket", "customCert", "targetURI"})
@ExcludeTags({"unixsocket", "targetURI"})
@Testcontainers
public class RunInProcessTest {

Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/test-harness

0 comments on commit a14931f

Please sign in to comment.