Skip to content

Commit

Permalink
Merge branch 'feat/grpc-reconnect-in-process' into feat/gherkin-rework
Browse files Browse the repository at this point in the history
  • Loading branch information
aepfli committed Jan 13, 2025
2 parents 0a3b414 + 6b9b10d commit c33d0ce
Show file tree
Hide file tree
Showing 26 changed files with 177 additions and 1,222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void error(HookContext ctx, Exception error, Map hints) {
}

@Override
public void finallyAfter(HookContext ctx, Map hints) {
public void finallyAfter(HookContext ctx, FlagEvaluationDetails details, Map hints) {
activeFlagEvaluationsCounter.add(-1, Attributes.of(flagKeyAttributeKey, ctx.getFlagKey()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void finally_stage_validation() {
final MetricsHook metricHook = new MetricsHook(telemetryExtension.getOpenTelemetry());

// when
metricHook.finallyAfter(commonHookContext, null);
metricHook.finallyAfter(commonHookContext, null, null);
List<MetricData> metrics = telemetryExtension.getMetrics();

// then
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
<!-- this can be overriden in child POMs to support specific SDK requirements -->
<groupId>dev.openfeature</groupId>
<artifactId>sdk</artifactId>
<!-- 1.12 <= v < 2.0 -->
<version>[1.12,2.0)</version>
<!-- 1.14 <= v < 2.0 (excluding 2.0 pre-releases)-->
<version>[1.14,1.99999)</version>
<!-- use the version provided at runtime -->
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -32,65 +34,58 @@ public static void monitorChannelState(
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
onConnectionReady.run();
if (onConnectionReady != null) {
onConnectionReady.run();
} else {
log.debug("onConnectionReady is null");
}
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
|| currentState == ConnectivityState.SHUTDOWN) {
onConnectionLost.run();
if (onConnectionLost != null) {
onConnectionLost.run();
} else {
log.debug("onConnectionLost is null");
}
}
// Re-register the state monitor to watch for the next state transition.
monitorChannelState(currentState, channel, onConnectionReady, onConnectionLost);
});
}

/**
* Waits for the channel to reach a desired state within a specified timeout period.
* Waits for the channel to reach the desired connectivity state within the specified timeout.
*
* @param channel the ManagedChannel to monitor.
* @param desiredState the ConnectivityState to wait for.
* @param connectCallback callback invoked when the desired state is reached.
* @param timeout the maximum amount of time to wait.
* @param unit the time unit of the timeout.
* @throws InterruptedException if the current thread is interrupted while waiting.
* @param desiredState the desired {@link ConnectivityState} to wait for
* @param channel the {@link ManagedChannel} to monitor
* @param connectCallback the {@link Runnable} to execute when the desired state is reached
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws GeneralError if the desired state is not reached within the timeout
*/
public static void waitForDesiredState(
ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
long timeout,
TimeUnit unit)
throws InterruptedException {
waitForDesiredState(channel, desiredState, connectCallback, new CountDownLatch(1), timeout, unit);
}

private static void waitForDesiredState(
ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
CountDownLatch latch,
long timeout,
TimeUnit unit)
throws InterruptedException {
channel.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> {
try {
ConnectivityState state = channel.getState(true);
log.debug("Channel state changed to: {}", state);
CountDownLatch latch = new CountDownLatch(1);

if (state == desiredState) {
connectCallback.run();
latch.countDown();
return;
}
waitForDesiredState(channel, desiredState, connectCallback, latch, timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread interrupted while waiting for desired state", e);
} catch (Exception e) {
log.error("Error occurred while waiting for desired state", e);
Runnable waitForStateTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState == desiredState) {
connectCallback.run();
latch.countDown();
}
});
};

ScheduledFuture<?> scheduledFuture = Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS);

// Await the latch or timeout for the state change
if (!latch.await(timeout, unit)) {
boolean success = latch.await(timeout, unit);
scheduledFuture.cancel(true);
if (!success) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
package dev.openfeature.contrib.providers.flagd.resolver.common;

import com.google.common.annotations.VisibleForTesting;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState;
import dev.openfeature.sdk.ImmutableStructure;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -125,8 +120,7 @@ public GrpcConnector(
* @param onConnectionEvent a consumer to handle connection events
* @param eventStreamObserver a consumer to handle the event stream
*/
@VisibleForTesting
GrpcConnector(
public GrpcConnector(
final FlagdOptions options,
final Function<ManagedChannel, T> stub,
final Function<ManagedChannel, K> blockingStub,
Expand All @@ -143,7 +137,7 @@ public GrpcConnector(
public void initialize() throws Exception {
log.info("Initializing GRPC connection...");
ChannelMonitor.waitForDesiredState(
channel, ConnectivityState.READY, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
ConnectivityState.READY, channel, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
ChannelMonitor.monitorChannelState(ConnectivityState.READY, channel, this::onReady, this::onConnectionLost);
}

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit c33d0ce

Please sign in to comment.