From 0c148a6461f1907afbb8978a5703b185b175fdde Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Wed, 25 Sep 2024 12:06:03 -0400 Subject: [PATCH] fixup: use deadline, pr feedback Signed-off-by: Todd Baert --- .../resolver/grpc/EventStreamObserver.java | 10 ++-- .../flagd/resolver/grpc/GrpcConnector.java | 6 +-- .../connector/grpc/GrpcStreamConnector.java | 14 ++--- .../grpc/GrpcStreamConnectorTest.java | 54 +++++++++++-------- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java index 856cf0b63..4eb9ce462 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java @@ -31,14 +31,14 @@ class EventStreamObserver implements StreamObserver { /** * Create a gRPC stream that get notified about flag changes. * - * @param sync synchronization object from caller - * @param cache cache to update - * @param onResponse lambda to call to handle the response + * @param sync synchronization object from caller + * @param cache cache to update + * @param onConnectionEvent lambda to call to handle the response */ - EventStreamObserver(Object sync, Cache cache, BiConsumer> onResponse) { + EventStreamObserver(Object sync, Cache cache, BiConsumer> onConnectionEvent) { this.sync = sync; this.cache = cache; - this.onConnectionEvent = onResponse; + this.onConnectionEvent = onConnectionEvent; } @Override diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java index 7f446dfae..9f99bae1d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java @@ -125,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() { private void observeEventStream() { while (this.eventStreamAttempt <= this.maxEventStreamRetries) { final StreamObserver responseObserver = new EventStreamObserver(sync, this.cache, - this::grpconConnectionEvent); + this::onConnectionEvent); this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver); try { @@ -156,10 +156,10 @@ private void observeEventStream() { } log.error("failed to connect to event stream, exhausted retries"); - this.grpconConnectionEvent(false, Collections.emptyList()); + this.onConnectionEvent(false, Collections.emptyList()); } - private void grpcOnConnectionEvent(final boolean connected, final List changedFlags) { + private void onConnectionEvent(final boolean connected, final List changedFlags) { // reset reconnection states if (connected) { this.eventStreamAttempt = 1; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index e33c0c4c8..182712043 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -35,15 +35,12 @@ "EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing") public class GrpcStreamConnector implements Connector { private static final Random RANDOM = new Random(); - private static final int INIT_BACK_OFF = 2 * 1000; private static final int MAX_BACK_OFF = 120 * 1000; - private static final int QUEUE_SIZE = 5; private final AtomicBoolean shutdown = new AtomicBoolean(false); private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final ManagedChannel channel; private final FlagSyncServiceStub serviceStub; private final FlagSyncServiceBlockingStub serviceBlockingStub; @@ -69,7 +66,7 @@ public GrpcStreamConnector(final FlagdOptions options) { public void init() { Thread listener = new Thread(() -> { try { - observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector); + observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline); } catch (InterruptedException e) { log.warn("gRPC event stream interrupted, flag configurations are stale", e); Thread.currentThread().interrupt(); @@ -118,7 +115,8 @@ static void observeEventStream(final BlockingQueue writeTo, final AtomicBoolean shutdown, final FlagSyncServiceStub serviceStub, final FlagSyncServiceBlockingStub serviceBlockingStub, - final String selector) + final String selector, + final int deadline) throws InterruptedException { final BlockingQueue streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); @@ -137,9 +135,11 @@ static void observeEventStream(final BlockingQueue writeTo, syncRequest.setSelector(selector); } - serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); + serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(), + new GrpcStreamHandler(streamReceiver)); try { - GetMetadataResponse metadataResponse = serviceBlockingStub.getMetadata(metadataRequest.build()); + GetMetadataResponse metadataResponse = serviceBlockingStub + .withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build()); metadata = convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap(); } catch (Exception e) { // the chances this call fails but the syncRequest does not are slim diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java index 083915a26..9e7a0a10e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java @@ -1,12 +1,15 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc; -import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -17,7 +20,6 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import com.google.protobuf.Struct; @@ -39,21 +41,25 @@ public void connectionParameters() throws Throwable { // given final FlagdOptions options = FlagdOptions.builder() .selector("selector") + .deadline(1337) .build(); final GrpcStreamConnector connector = new GrpcStreamConnector(options); final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); - + final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); final SyncFlagsRequest[] request = new SyncFlagsRequest[1]; - Mockito.doAnswer(invocation -> { + when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); + doAnswer(invocation -> { request[0] = invocation.getArgument(0, SyncFlagsRequest.class); return null; }).when(stubMock).syncFlags(any(), any()); // when connector.init(); - verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS); + verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS); // then final SyncFlagsRequest flagsRequest = request[0]; @@ -70,17 +76,19 @@ public void grpcConnectionStatus() throws Throwable { final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); -final Struct metadata = Struct.newBuilder() - .putFields(key, - com.google.protobuf.Value.newBuilder().setStringValue(val).build()) - .build(); - + final Struct metadata = Struct.newBuilder() + .putFields(key, + com.google.protobuf.Value.newBuilder().setStringValue(val).build()) + .build(); - when(blockingStubMock.getMetadata(any())).thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build()); + when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock); + when(blockingStubMock.getMetadata(any())) + .thenReturn(GetMetadataResponse.newBuilder().setMetadata(metadata).build()); final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; - Mockito.doAnswer(invocation -> { + when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); + doAnswer(invocation -> { injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class); return null; }).when(stubMock).syncFlags(any(), any()); @@ -88,7 +96,7 @@ public void grpcConnectionStatus() throws Throwable { // when connector.init(); // verify and wait for initialization - verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); verify(blockingStubMock).getMetadata(any()); // then @@ -133,7 +141,9 @@ public void listenerExitOnShutdown() throws Throwable { final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; - Mockito.doAnswer(invocation -> { + when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock); + when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); + doAnswer(invocation -> { injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class); return null; }).when(stubMock).syncFlags(any(), any()); @@ -141,7 +151,7 @@ public void listenerExitOnShutdown() throws Throwable { // when connector.init(); // verify and wait for initialization - verify(stubMock, Mockito.timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); + verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); verify(blockingStubMock).getMetadata(any()); // then @@ -176,7 +186,7 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c final Field serviceStubField = GrpcStreamConnector.class.getDeclaredField("serviceStub"); serviceStubField.setAccessible(true); - final FlagSyncServiceStub stubMock = Mockito.mock(FlagSyncServiceStub.class); + final FlagSyncServiceStub stubMock = mock(FlagSyncServiceStub.class); serviceStubField.set(connector, stubMock); @@ -184,15 +194,15 @@ private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector c } private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector) - throws Throwable { - final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub"); - blockingStubField.setAccessible(true); + throws Throwable { + final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub"); + blockingStubField.setAccessible(true); - final FlagSyncServiceBlockingStub blockingStubMock = Mockito.mock(FlagSyncServiceBlockingStub.class); + final FlagSyncServiceBlockingStub blockingStubMock = mock(FlagSyncServiceBlockingStub.class); - blockingStubField.set(connector, blockingStubMock); + blockingStubField.set(connector, blockingStubMock); - return blockingStubMock; + return blockingStubMock; } }