Skip to content

Commit

Permalink
fixup: remove stream deadline
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <[email protected]>
  • Loading branch information
toddbaert committed Sep 26, 2024
1 parent 2c94fcf commit 0725139
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
syncRequest.setSelector(selector);
}

serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(),
new GrpcStreamHandler(streamReceiver));
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
try {
GetMetadataResponse metadataResponse = serviceBlockingStub
.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static void setup() throws InterruptedException {
flagdContainer.start();
FlagdInProcessSetup.provider = new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
// set a generous deadline, to prevent timeouts in actions
.deadline(3000)
.port(flagdContainer.getFirstMappedPort())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static void setup() throws InterruptedException {
FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.port(flagdContainer.getFirstMappedPort())
// set a generous deadline, to prevent timeouts in actions
.deadline(3000)
.build());
StepDefinitions.setUnstableProvider(workingProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static void setup() throws InterruptedException {
FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder()
.resolverType(Config.Resolver.RPC)
.port(flagdContainer.getFirstMappedPort())
// set a generous deadline, to prevent timeouts in actions
.deadline(3000)
.cacheType(CacheType.DISABLED.getValue())
.build());
StepDefinitions.setUnstableProvider(workingProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,33 @@ void initialization_fail_with_timeout() throws Exception {
final Cache cache = new Cache("disabled", 0);
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
Consumer<ConnectionEvent> onConnectionEvent = mock(Consumer.class);
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());
doAnswer((InvocationOnMock invocation) -> {
EventStreamObserver eventStreamObserver = (EventStreamObserver) invocation.getArgument(1);
eventStreamObserver
.onError(new Exception("fake"));
return null;
}).when(mockStub).eventStream(any(), any());

try (MockedStatic<ServiceGrpc> mockStaticService = mockStatic(ServiceGrpc.class)) {
mockStaticService.when(() -> ServiceGrpc.newStub(any()))
.thenReturn(mockStub);

// pass true in connected lambda
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> {
try {
Thread.sleep(100);
return true;
} catch (Exception e) {
}
return false;

final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false,
onConnectionEvent);
},
onConnectionEvent);

// assert throws
assertThrows(RuntimeException.class, connector::initialize);
// assert that onConnectionEvent is not connected
verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected()));
assertDoesNotThrow(connector::initialize);
// assert that onConnectionEvent is connected
verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected()));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public void connectionParameters() throws Throwable {
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
final SyncFlagsRequest[] request = new SyncFlagsRequest[1];

when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
doAnswer(invocation -> {
request[0] = invocation.getArgument(0, SyncFlagsRequest.class);
return null;
Expand All @@ -58,7 +57,6 @@ public void connectionParameters() throws Throwable {
// when
connector.init();
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);

// then
Expand Down Expand Up @@ -87,7 +85,6 @@ public void grpcConnectionStatus() throws Throwable {

final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];

when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
doAnswer(invocation -> {
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
return null;
Expand Down

0 comments on commit 0725139

Please sign in to comment.