From 0725139ca1415c09f6b807486d655d4556bd4fd2 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Thu, 26 Sep 2024 14:38:39 -0400 Subject: [PATCH] fixup: remove stream deadline Signed-off-by: Todd Baert --- .../connector/grpc/GrpcStreamConnector.java | 3 +- .../e2e/process/FlagdInProcessSetup.java | 1 + .../process/FlagdInProcessSetup.java | 2 ++ .../e2e/reconnect/rpc/FlagdRpcSetup.java | 2 ++ .../resolver/grpc/GrpcConnectorTest.java | 32 +++++++++++++++---- .../grpc/GrpcStreamConnectorTest.java | 3 -- 6 files changed, 31 insertions(+), 12 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index 182712043..9a0e8048d 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -135,8 +135,7 @@ static void observeEventStream(final BlockingQueue writeTo, syncRequest.setSelector(selector); } - serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(), - new GrpcStreamHandler(streamReceiver)); + serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); try { GetMetadataResponse metadataResponse = serviceBlockingStub .withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build()); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java index a101a631d..6217e4830 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java @@ -26,6 +26,7 @@ public static void setup() throws InterruptedException { flagdContainer.start(); FlagdInProcessSetup.provider = new FlagdProvider(FlagdOptions.builder() .resolverType(Config.Resolver.IN_PROCESS) + // set a generous deadline, to prevent timeouts in actions .deadline(3000) .port(flagdContainer.getFirstMappedPort()) .build()); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/process/FlagdInProcessSetup.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/process/FlagdInProcessSetup.java index 3f70a6a52..1140c04c7 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/process/FlagdInProcessSetup.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/process/FlagdInProcessSetup.java @@ -24,6 +24,8 @@ public static void setup() throws InterruptedException { FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder() .resolverType(Config.Resolver.IN_PROCESS) .port(flagdContainer.getFirstMappedPort()) + // set a generous deadline, to prevent timeouts in actions + .deadline(3000) .build()); StepDefinitions.setUnstableProvider(workingProvider); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/rpc/FlagdRpcSetup.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/rpc/FlagdRpcSetup.java index 88ca201ec..e323cfc7c 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/rpc/FlagdRpcSetup.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/rpc/FlagdRpcSetup.java @@ -28,6 +28,8 @@ public static void setup() throws InterruptedException { FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder() .resolverType(Config.Resolver.RPC) .port(flagdContainer.getFirstMappedPort()) + // set a generous deadline, to prevent timeouts in actions + .deadline(3000) .cacheType(CacheType.DISABLED.getValue()) .build()); StepDefinitions.setUnstableProvider(workingProvider); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java index 9e2fb4f40..59e7b6897 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java @@ -130,15 +130,33 @@ void initialization_fail_with_timeout() throws Exception { final Cache cache = new Cache("disabled", 0); final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class); Consumer onConnectionEvent = mock(Consumer.class); - doAnswer(invocation -> null).when(mockStub).eventStream(any(), any()); + doAnswer((InvocationOnMock invocation) -> { + EventStreamObserver eventStreamObserver = (EventStreamObserver) invocation.getArgument(1); + eventStreamObserver + .onError(new Exception("fake")); + return null; + }).when(mockStub).eventStream(any(), any()); + + try (MockedStatic mockStaticService = mockStatic(ServiceGrpc.class)) { + mockStaticService.when(() -> ServiceGrpc.newStub(any())) + .thenReturn(mockStub); + + // pass true in connected lambda + final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> { + try { + Thread.sleep(100); + return true; + } catch (Exception e) { + } + return false; - final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false, - onConnectionEvent); + }, + onConnectionEvent); - // assert throws - assertThrows(RuntimeException.class, connector::initialize); - // assert that onConnectionEvent is not connected - verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected())); + assertDoesNotThrow(connector::initialize); + // assert that onConnectionEvent is connected + verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected())); + } } @Test diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java index 9e7a0a10e..84de5868e 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java @@ -49,7 +49,6 @@ public void connectionParameters() throws Throwable { final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); final SyncFlagsRequest[] request = new SyncFlagsRequest[1]; - when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); doAnswer(invocation -> { request[0] = invocation.getArgument(0, SyncFlagsRequest.class); return null; @@ -58,7 +57,6 @@ public void connectionParameters() throws Throwable { // when connector.init(); verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); - verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS); verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS); // then @@ -87,7 +85,6 @@ public void grpcConnectionStatus() throws Throwable { final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; - when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); doAnswer(invocation -> { injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class); return null;